diff --git a/.github/config/chunks.yaml b/.github/config/chunks.yaml index d29a863959..d654ec72da 100644 --- a/.github/config/chunks.yaml +++ b/.github/config/chunks.yaml @@ -18,7 +18,7 @@ chunks: common: name: Common workflow: chunk-common.yaml - modules: [ core, configuration, sketches, parquet, common/common-job, common/common-task, build, common/dynamodb-tools ] + modules: [ core, configuration, sketches, parquet, common/common-job, common/common-task, common/common-invoke-tables, build, common/dynamodb-tools ] compaction: name: Compaction workflow: chunk-compaction.yaml diff --git a/.github/workflows/chunk-cdk.yaml b/.github/workflows/chunk-cdk.yaml index 867515d3b7..07ffa5111d 100644 --- a/.github/workflows/chunk-cdk.yaml +++ b/.github/workflows/chunk-cdk.yaml @@ -14,10 +14,10 @@ on: - 'java/cdk-environment/**' - 'java/system-test/system-test-cdk/**' - 'java/system-test/system-test-configuration/**' + - 'java/system-test/system-test-data-generation/**' - 'java/system-test/system-test-dsl/**' - 'java/system-test/system-test-drivers/**' - 'java/system-test/system-test-suite/**' - - 'java/system-test/system-test-data-generation/**' - 'java/bulk-import/bulk-import-starter/**' - 'java/clients/**' - 'java/ingest/ingest-batcher-submitter/**' @@ -34,6 +34,7 @@ on: - 'java/ingest/ingest-runner/**' - 'java/compaction/compaction-status-store/**' - 'java/compaction/compaction-job-creation/**' + - 'java/common/common-invoke-tables/**' - 'java/common/common-job/**' - 'java/ingest/ingest-status-store/**' - 'java/sketches/**' diff --git a/.github/workflows/chunk-clients.yaml b/.github/workflows/chunk-clients.yaml index bb2b4f58ae..b8a4f335c0 100644 --- a/.github/workflows/chunk-clients.yaml +++ b/.github/workflows/chunk-clients.yaml @@ -13,6 +13,7 @@ on: - 'java/compaction/compaction-status-store/**' - 'java/splitter/splitter-core/**' - 'java/ingest/ingest-batcher-store/**' + - 'java/common/common-invoke-tables/**' - 'java/common/common-task/**' - 'java/ingest/ingest-batcher-core/**' - 'java/compaction/compaction-core/**' diff --git a/.github/workflows/chunk-common.yaml b/.github/workflows/chunk-common.yaml index 3b4eaf3253..7c3203fd08 100644 --- a/.github/workflows/chunk-common.yaml +++ b/.github/workflows/chunk-common.yaml @@ -15,6 +15,7 @@ on: - 'java/parquet/**' - 'java/common/common-job/**' - 'java/common/common-task/**' + - 'java/common/common-invoke-tables/**' - 'java/build/**' - 'java/common/dynamodb-tools/**' diff --git a/.github/workflows/chunk-compaction.yaml b/.github/workflows/chunk-compaction.yaml index 62d02a2977..4ebb2b081b 100644 --- a/.github/workflows/chunk-compaction.yaml +++ b/.github/workflows/chunk-compaction.yaml @@ -19,6 +19,7 @@ on: - 'java/splitter/splitter-core/**' - 'java/splitter/splitter-lambda/**' - 'java/common/common-task/**' + - 'java/common/common-invoke-tables/**' - 'java/ingest/ingest-runner/**' - 'java/common/common-job/**' - 'java/ingest/ingest-status-store/**' diff --git a/.github/workflows/chunk-data.yaml b/.github/workflows/chunk-data.yaml index fe15b910c1..0cfdf66537 100644 --- a/.github/workflows/chunk-data.yaml +++ b/.github/workflows/chunk-data.yaml @@ -8,12 +8,13 @@ on: - 'code-style/checkstyle*.xml' - 'code-style/spotbugs*.xml' - 'java/pom.xml' - - 'java/metrics/**' - 'java/garbage-collector/**' + - 'java/metrics/**' + - 'java/common/common-invoke-tables/**' - 'java/statestore/**' - - 'java/common/dynamodb-tools/**' - 'java/parquet/**' - 'java/configuration/**' + - 'java/common/dynamodb-tools/**' - 'java/core/**' jobs: diff --git a/java/clients/pom.xml b/java/clients/pom.xml index 1581690842..ce9125cfd0 100644 --- a/java/clients/pom.xml +++ b/java/clients/pom.xml @@ -136,6 +136,11 @@ ingest-batcher-store ${project.parent.version} + + sleeper + common-invoke-tables + ${project.parent.version} + sleeper diff --git a/java/clients/src/main/java/sleeper/clients/status/update/TriggerGarbageCollectionClient.java b/java/clients/src/main/java/sleeper/clients/status/update/TriggerGarbageCollectionClient.java index b69a9011ec..a9fcbb7115 100644 --- a/java/clients/src/main/java/sleeper/clients/status/update/TriggerGarbageCollectionClient.java +++ b/java/clients/src/main/java/sleeper/clients/status/update/TriggerGarbageCollectionClient.java @@ -24,11 +24,8 @@ import sleeper.configuration.properties.instance.InstanceProperties; import sleeper.configuration.table.index.DynamoDBTableIndex; -import sleeper.core.table.InvokeForTableRequest; -import sleeper.core.table.InvokeForTableRequestSerDe; import sleeper.core.table.TableIndex; -import sleeper.core.table.TableNotFoundException; -import sleeper.core.table.TableStatus; +import sleeper.invoke.tables.InvokeForTables; import java.util.List; import java.util.stream.Stream; @@ -58,14 +55,8 @@ public static void main(String[] args) { InstanceProperties instanceProperties = new InstanceProperties(); instanceProperties.loadFromS3GivenInstanceId(s3Client, instanceId); TableIndex tableIndex = new DynamoDBTableIndex(instanceProperties, dynamoClient); - List tables = tableNames.stream() - .map(name -> tableIndex.getTableByName(name) - .orElseThrow(() -> TableNotFoundException.withTableName(name))) - .collect(toUnmodifiableList()); String queueUrl = instanceProperties.get(GARBAGE_COLLECTOR_QUEUE_URL); - InvokeForTableRequestSerDe serDe = new InvokeForTableRequestSerDe(); - InvokeForTableRequest.forTables(tables.stream(), tables.size(), - request -> sqsClient.sendMessage(queueUrl, serDe.toJson(request))); + InvokeForTables.sendOneMessagePerTableByName(sqsClient, queueUrl, tableIndex, tableNames); } finally { s3Client.shutdown(); dynamoClient.shutdown(); diff --git a/java/common/common-invoke-tables/pom.xml b/java/common/common-invoke-tables/pom.xml new file mode 100644 index 0000000000..24db271ea9 --- /dev/null +++ b/java/common/common-invoke-tables/pom.xml @@ -0,0 +1,75 @@ + + + + + + common + sleeper + 0.23.0-SNAPSHOT + + + 4.0.0 + + common-invoke-tables + + + + com.amazonaws + aws-java-sdk-sqs + ${aws-java-sdk.version} + + + + sleeper + core + ${project.parent.version} + + + + sleeper + core + ${project.parent.version} + test-jar + test + + + sleeper + configuration + ${project.parent.version} + test-jar + test + + + org.testcontainers + testcontainers + test + + + org.testcontainers + localstack + test + + + org.testcontainers + junit-jupiter + test + + + + \ No newline at end of file diff --git a/java/common/common-invoke-tables/src/main/java/sleeper/invoke/tables/InvokeForTables.java b/java/common/common-invoke-tables/src/main/java/sleeper/invoke/tables/InvokeForTables.java new file mode 100644 index 0000000000..70beac9f10 --- /dev/null +++ b/java/common/common-invoke-tables/src/main/java/sleeper/invoke/tables/InvokeForTables.java @@ -0,0 +1,63 @@ +/* + * Copyright 2022-2024 Crown Copyright + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package sleeper.invoke.tables; + +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.SendMessageBatchRequest; +import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry; + +import sleeper.core.table.TableIndex; +import sleeper.core.table.TableNotFoundException; +import sleeper.core.table.TableStatus; +import sleeper.core.util.SplitIntoBatches; + +import java.util.List; +import java.util.UUID; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.toUnmodifiableList; + +public class InvokeForTables { + + private InvokeForTables() { + } + + public static void sendOneMessagePerTable(AmazonSQS sqsClient, String queueUrl, Stream tables) { + // Limit to stay under the maximum size for an SQS sendMessageBatch call. + SplitIntoBatches.reusingListOfSize(10, tables, + batch -> sendMessageBatch(sqsClient, queueUrl, batch)); + } + + public static void sendOneMessagePerTableByName( + AmazonSQS sqsClient, String queueUrl, TableIndex tableIndex, List tableNames) { + List tables = tableNames.stream().map(name -> tableIndex.getTableByName(name) + .orElseThrow(() -> TableNotFoundException.withTableName(name))) + .collect(toUnmodifiableList()); + sendOneMessagePerTable(sqsClient, queueUrl, tables.stream()); + } + + private static void sendMessageBatch(AmazonSQS sqsClient, String queueUrl, List tablesBatch) { + sqsClient.sendMessageBatch(new SendMessageBatchRequest() + .withQueueUrl(queueUrl) + .withEntries(tablesBatch.stream() + .map(table -> new SendMessageBatchRequestEntry() + .withMessageDeduplicationId(UUID.randomUUID().toString()) + .withId(table.getTableUniqueId()) + .withMessageGroupId(table.getTableUniqueId()) + .withMessageBody(table.getTableUniqueId())) + .collect(toUnmodifiableList()))); + } +} diff --git a/java/common/common-invoke-tables/src/test/java/sleeper/invoke/tables/InvokeForTablesIT.java b/java/common/common-invoke-tables/src/test/java/sleeper/invoke/tables/InvokeForTablesIT.java new file mode 100644 index 0000000000..e9c27584f0 --- /dev/null +++ b/java/common/common-invoke-tables/src/test/java/sleeper/invoke/tables/InvokeForTablesIT.java @@ -0,0 +1,154 @@ +/* + * Copyright 2022-2024 Crown Copyright + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package sleeper.invoke.tables; + +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.AmazonSQSClientBuilder; +import com.amazonaws.services.sqs.model.CreateQueueRequest; +import com.amazonaws.services.sqs.model.CreateQueueResult; +import com.amazonaws.services.sqs.model.Message; +import com.amazonaws.services.sqs.model.ReceiveMessageRequest; +import com.amazonaws.services.sqs.model.ReceiveMessageResult; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.localstack.LocalStackContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import sleeper.core.CommonTestConstants; +import sleeper.core.table.InMemoryTableIndex; +import sleeper.core.table.TableIndex; +import sleeper.core.table.TableNotFoundException; + +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.toUnmodifiableList; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static sleeper.configuration.testutils.LocalStackAwsV1ClientHelper.buildAwsV1Client; +import static sleeper.core.table.TableStatusTestHelper.uniqueIdAndName; + +@Testcontainers +public class InvokeForTablesIT { + + @Container + public static LocalStackContainer localStackContainer = new LocalStackContainer(DockerImageName.parse(CommonTestConstants.LOCALSTACK_DOCKER_IMAGE)) + .withServices(LocalStackContainer.Service.SQS); + + private final AmazonSQS sqsClient = buildAwsV1Client(localStackContainer, LocalStackContainer.Service.SQS, AmazonSQSClientBuilder.standard()); + + @Test + void shouldSendOneMessage() { + // Given + String queueUrl = createFifoQueueGetUrl(); + + // When + InvokeForTables.sendOneMessagePerTable(sqsClient, queueUrl, Stream.of( + uniqueIdAndName("table-id", "table-name"))); + + // Then + assertThat(receiveTableIdMessages(queueUrl, 2)) + .containsExactly("table-id"); + } + + @Test + void shouldSendMoreMessagesThanFitInAnSqsSendMessageBatch() { + // Given a FIFO queue + String queueUrl = createFifoQueueGetUrl(); + + // When we send more than the SQS hard limit of 10 messages to send in a single batch + InvokeForTables.sendOneMessagePerTable(sqsClient, queueUrl, + IntStream.rangeClosed(1, 11) + .mapToObj(i -> uniqueIdAndName("table-id-" + i, "table-name-" + i))); + + // Then we can receive those messages + assertThat(receiveTableIdMessages(queueUrl, 10)).containsExactly( + "table-id-1", "table-id-2", "table-id-3", "table-id-4", "table-id-5", + "table-id-6", "table-id-7", "table-id-8", "table-id-9", "table-id-10"); + assertThat(receiveTableIdMessages(queueUrl, 10)).containsExactly( + "table-id-11"); + } + + @Test + void shouldLookUpTableByName() { + // Given + String queueUrl = createFifoQueueGetUrl(); + TableIndex tableIndex = new InMemoryTableIndex(); + tableIndex.create(uniqueIdAndName("table-id", "table-name")); + + // When + InvokeForTables.sendOneMessagePerTableByName(sqsClient, queueUrl, tableIndex, List.of("table-name")); + + // Then + assertThat(receiveTableIdMessages(queueUrl, 2)) + .containsExactly("table-id"); + } + + @Test + void shouldFailLookUpTableByName() { + // Given + String queueUrl = createFifoQueueGetUrl(); + TableIndex tableIndex = new InMemoryTableIndex(); + + // When / Then + assertThatThrownBy(() -> InvokeForTables.sendOneMessagePerTableByName( + sqsClient, queueUrl, tableIndex, List.of("missing-table"))) + .isInstanceOf(TableNotFoundException.class); + assertThat(receiveTableIdMessages(queueUrl, 1)) + .isEmpty(); + } + + @Test + void shouldFailLookUpTableByNameOnSecondPage() { + // Given + String queueUrl = createFifoQueueGetUrl(); + TableIndex tableIndex = new InMemoryTableIndex(); + IntStream.rangeClosed(1, 11) + .mapToObj(i -> uniqueIdAndName("table-id-" + i, "table-name-" + i)) + .forEach(tableIndex::create); + + // When / Then + assertThatThrownBy(() -> InvokeForTables.sendOneMessagePerTableByName(sqsClient, queueUrl, tableIndex, + IntStream.rangeClosed(1, 12) + .mapToObj(i -> "table-name-" + i) + .collect(toUnmodifiableList()))) + .isInstanceOf(TableNotFoundException.class); + assertThat(receiveTableIdMessages(queueUrl, 10)) + .isEmpty(); + } + + private String createFifoQueueGetUrl() { + CreateQueueResult result = sqsClient.createQueue(new CreateQueueRequest() + .withQueueName(UUID.randomUUID().toString() + ".fifo") + .withAttributes(Map.of("FifoQueue", "true"))); + return result.getQueueUrl(); + } + + private List receiveTableIdMessages(String queueUrl, int maxMessages) { + ReceiveMessageResult result = sqsClient.receiveMessage( + new ReceiveMessageRequest(queueUrl) + .withMaxNumberOfMessages(maxMessages) + .withWaitTimeSeconds(0)); + return result.getMessages().stream() + .map(Message::getBody) + .collect(toUnmodifiableList()); + } + +} diff --git a/java/common/pom.xml b/java/common/pom.xml index 8409bfdf62..e04ff6ce0b 100644 --- a/java/common/pom.xml +++ b/java/common/pom.xml @@ -29,6 +29,7 @@ common-job common-task + common-invoke-tables dynamodb-tools diff --git a/java/compaction/compaction-job-creation-lambda/pom.xml b/java/compaction/compaction-job-creation-lambda/pom.xml index e656f2c57e..b778867611 100644 --- a/java/compaction/compaction-job-creation-lambda/pom.xml +++ b/java/compaction/compaction-job-creation-lambda/pom.xml @@ -67,6 +67,11 @@ configuration ${project.parent.version} + + sleeper + common-invoke-tables + ${project.parent.version} + com.amazonaws diff --git a/java/compaction/compaction-job-creation-lambda/src/main/java/sleeper/compaction/job/creation/lambda/CreateCompactionJobsTriggerLambda.java b/java/compaction/compaction-job-creation-lambda/src/main/java/sleeper/compaction/job/creation/lambda/CreateCompactionJobsTriggerLambda.java index 1fc3992fa2..4f7bdf8ce3 100644 --- a/java/compaction/compaction-job-creation-lambda/src/main/java/sleeper/compaction/job/creation/lambda/CreateCompactionJobsTriggerLambda.java +++ b/java/compaction/compaction-job-creation-lambda/src/main/java/sleeper/compaction/job/creation/lambda/CreateCompactionJobsTriggerLambda.java @@ -24,8 +24,6 @@ import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; -import com.amazonaws.services.sqs.model.SendMessageBatchRequest; -import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,15 +31,11 @@ import sleeper.configuration.properties.instance.InstanceProperties; import sleeper.configuration.table.index.DynamoDBTableIndex; import sleeper.core.table.TableIndex; -import sleeper.core.table.TableStatus; import sleeper.core.util.LoggedDuration; -import sleeper.core.util.SplitIntoBatches; +import sleeper.invoke.tables.InvokeForTables; import java.time.Instant; -import java.util.List; -import java.util.UUID; -import static java.util.stream.Collectors.toUnmodifiableList; import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.COMPACTION_JOB_CREATION_QUEUE_URL; import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.CONFIG_BUCKET; @@ -72,25 +66,10 @@ public Void handleRequest(ScheduledEvent event, Context context) { String queueUrl = instanceProperties.get(COMPACTION_JOB_CREATION_QUEUE_URL); TableIndex tableIndex = new DynamoDBTableIndex(instanceProperties, dynamoClient); - SplitIntoBatches.reusingListOfSize(10, - tableIndex.streamOnlineTables(), - tables -> sendMessageBatch(tables, queueUrl)); + InvokeForTables.sendOneMessagePerTable(sqsClient, queueUrl, tableIndex.streamOnlineTables()); Instant finishTime = Instant.now(); LOGGER.info("Lambda finished at {} (ran for {})", finishTime, LoggedDuration.withFullOutput(startTime, finishTime)); return null; } - - private void sendMessageBatch(List tables, String queueUrl) { - sqsClient.sendMessageBatch(new SendMessageBatchRequest() - .withQueueUrl(queueUrl) - .withEntries(tables.stream() - .map(table -> new SendMessageBatchRequestEntry() - .withMessageDeduplicationId(UUID.randomUUID().toString()) - .withId(table.getTableUniqueId()) - .withMessageGroupId(table.getTableUniqueId()) - .withMessageBody(table.getTableUniqueId())) - .collect(toUnmodifiableList()))); - } - } diff --git a/java/core/src/main/java/sleeper/core/table/InvokeForTableRequest.java b/java/core/src/main/java/sleeper/core/table/InvokeForTableRequest.java deleted file mode 100644 index 2689f33f89..0000000000 --- a/java/core/src/main/java/sleeper/core/table/InvokeForTableRequest.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright 2022-2024 Crown Copyright - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package sleeper.core.table; - -import sleeper.core.util.SplitIntoBatches; - -import java.util.List; -import java.util.Objects; -import java.util.function.Consumer; -import java.util.stream.Stream; - -/** - * Model for an SQS message to invoke some operation for a batch of tables. Use {@link InvokeForTableRequestSerDe} to - * build SQS messages from objects of this class. - */ -public class InvokeForTableRequest { - - private final List tableIds; - - public InvokeForTableRequest(List tableIds) { - this.tableIds = tableIds; - } - - public List getTableIds() { - return tableIds; - } - - /** - * Creates batches of tables, and sends a request for each batch. - * - * @param tables a stream of tables to batch - * @param batchSize the batch size - * @param sendRequest a function to send a request - */ - public static void forTables(Stream tables, int batchSize, Consumer sendRequest) { - SplitIntoBatches.reusingListOfSize(batchSize, - tables.map(TableStatus::getTableUniqueId), - tableIds -> sendRequest.accept(new InvokeForTableRequest(tableIds))); - } - - /** - * Loads tables from the table index, creates batches of tables, and sends a request for each batch. - * - * @param offlineEnabled whether to include offline tables when creating batches - * @param tableIndex the table index - * @param batchSize the batch size - * @param sendRequest a function to send a request - */ - public static void forTablesWithOfflineEnabled( - boolean offlineEnabled, TableIndex tableIndex, int batchSize, Consumer sendRequest) { - forTables( - offlineEnabled ? tableIndex.streamAllTables() : tableIndex.streamOnlineTables(), - batchSize, sendRequest); - } - - @Override - public int hashCode() { - return Objects.hash(tableIds); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (!(obj instanceof InvokeForTableRequest)) { - return false; - } - InvokeForTableRequest other = (InvokeForTableRequest) obj; - return Objects.equals(tableIds, other.tableIds); - } - - @Override - public String toString() { - return "InvokeForTableRequest{tableIds=" + tableIds + "}"; - } - -} diff --git a/java/core/src/main/java/sleeper/core/table/InvokeForTableRequestSerDe.java b/java/core/src/main/java/sleeper/core/table/InvokeForTableRequestSerDe.java deleted file mode 100644 index e785734731..0000000000 --- a/java/core/src/main/java/sleeper/core/table/InvokeForTableRequestSerDe.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright 2022-2024 Crown Copyright - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package sleeper.core.table; - -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; - -/** - * Serialisation and deserialisation of SQS messages to invoke some operation for a batch of tables. - */ -public class InvokeForTableRequestSerDe { - - private final Gson gson = new GsonBuilder().create(); - - /** - * Serialises a request for a batch of tables to a JSON string. - * - * @param request the request - * @return the JSON string - */ - public String toJson(InvokeForTableRequest request) { - return gson.toJson(request); - } - - /** - * Deserialises a request for a batch of tables from a JSON string. - * - * @param json the JSON string - * @return the request - */ - public InvokeForTableRequest fromJson(String json) { - return gson.fromJson(json, InvokeForTableRequest.class); - } -} diff --git a/java/core/src/test/java/sleeper/core/table/InvokeForTableRequestTest.java b/java/core/src/test/java/sleeper/core/table/InvokeForTableRequestTest.java deleted file mode 100644 index f4eb193e3e..0000000000 --- a/java/core/src/test/java/sleeper/core/table/InvokeForTableRequestTest.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright 2022-2024 Crown Copyright - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package sleeper.core.table; - -import org.junit.jupiter.api.Test; - -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Stream; - -import static org.assertj.core.api.Assertions.assertThat; - -public class InvokeForTableRequestTest { - - private final InvokeForTableRequestSerDe serDe = new InvokeForTableRequestSerDe(); - - @Test - void shouldSendRequestForTwoTables() { - List sent = new ArrayList<>(); - InvokeForTableRequest.forTables( - Stream.of(table("table-1"), table("table-2")), - 1, request -> sent.add(serDe.toJson(request))); - assertThat(sent).extracting(serDe::fromJson).containsExactly( - new InvokeForTableRequest(List.of("table-1")), - new InvokeForTableRequest(List.of("table-2"))); - } - - @Test - void shouldSendBatchesOf2() { - List sent = new ArrayList<>(); - InvokeForTableRequest.forTables( - Stream.of(table("table-1"), table("table-2")), - 2, request -> sent.add(serDe.toJson(request))); - assertThat(sent).extracting(serDe::fromJson).containsExactly( - new InvokeForTableRequest(List.of("table-1", "table-2"))); - } - - @Test - void shouldSendRequestForOnlyOnlineTable() { - // Given - TableIndex tableIndex = new InMemoryTableIndex(); - tableIndex.create(table("offline-table").takeOffline()); - tableIndex.create(table("online-table")); - - // When - List sent = new ArrayList<>(); - InvokeForTableRequest.forTablesWithOfflineEnabled(false, tableIndex, - 1, request -> sent.add(serDe.toJson(request))); - - // Then - assertThat(sent).extracting(serDe::fromJson).containsExactly( - new InvokeForTableRequest(List.of("online-table"))); - } - - @Test - void shouldSendRequestForAllTablesWhenOfflineEnabled() { - // Given - TableIndex tableIndex = new InMemoryTableIndex(); - tableIndex.create(table("offline-table").takeOffline()); - tableIndex.create(table("online-table")); - - // When - List sent = new ArrayList<>(); - InvokeForTableRequest.forTablesWithOfflineEnabled(true, tableIndex, - 1, request -> sent.add(serDe.toJson(request))); - - // Then - assertThat(sent).extracting(serDe::fromJson).containsExactly( - new InvokeForTableRequest(List.of("offline-table")), - new InvokeForTableRequest(List.of("online-table"))); - } - - private TableStatus table(String tableName) { - return TableStatusTestHelper.uniqueIdAndName(tableName, tableName); - } -} diff --git a/java/garbage-collector/pom.xml b/java/garbage-collector/pom.xml index f2b2ec2c14..09dc6e14e0 100644 --- a/java/garbage-collector/pom.xml +++ b/java/garbage-collector/pom.xml @@ -84,6 +84,11 @@ statestore ${project.parent.version} + + sleeper + common-invoke-tables + ${project.parent.version} + sleeper diff --git a/java/garbage-collector/src/main/java/sleeper/garbagecollector/GarbageCollectorTriggerLambda.java b/java/garbage-collector/src/main/java/sleeper/garbagecollector/GarbageCollectorTriggerLambda.java index be27c6e001..6d3354bc14 100644 --- a/java/garbage-collector/src/main/java/sleeper/garbagecollector/GarbageCollectorTriggerLambda.java +++ b/java/garbage-collector/src/main/java/sleeper/garbagecollector/GarbageCollectorTriggerLambda.java @@ -24,8 +24,6 @@ import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; -import com.amazonaws.services.sqs.model.SendMessageBatchRequest; -import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,15 +31,11 @@ import sleeper.configuration.properties.instance.InstanceProperties; import sleeper.configuration.table.index.DynamoDBTableIndex; import sleeper.core.table.TableIndex; -import sleeper.core.table.TableStatus; import sleeper.core.util.LoggedDuration; -import sleeper.core.util.SplitIntoBatches; +import sleeper.invoke.tables.InvokeForTables; import java.time.Instant; -import java.util.List; -import java.util.UUID; -import static java.util.stream.Collectors.toUnmodifiableList; import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.CONFIG_BUCKET; import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.GARBAGE_COLLECTOR_QUEUE_URL; import static sleeper.configuration.properties.instance.GarbageCollectionProperty.GARBAGE_COLLECT_OFFLINE_TABLES; @@ -73,25 +67,11 @@ public Void handleRequest(ScheduledEvent event, Context context) { String queueUrl = instanceProperties.get(GARBAGE_COLLECTOR_QUEUE_URL); boolean offlineEnabled = instanceProperties.getBoolean(GARBAGE_COLLECT_OFFLINE_TABLES); TableIndex tableIndex = new DynamoDBTableIndex(instanceProperties, dynamoClient); - SplitIntoBatches.reusingListOfSize(10, - offlineEnabled ? tableIndex.streamAllTables() : tableIndex.streamOnlineTables(), - tables -> sendMessageBatch(tables, queueUrl)); + InvokeForTables.sendOneMessagePerTable(sqsClient, queueUrl, + offlineEnabled ? tableIndex.streamAllTables() : tableIndex.streamOnlineTables()); Instant finishTime = Instant.now(); LOGGER.info("Lambda finished at {} (ran for {})", finishTime, LoggedDuration.withFullOutput(startTime, finishTime)); return null; } - - private void sendMessageBatch(List tables, String queueUrl) { - sqsClient.sendMessageBatch(new SendMessageBatchRequest() - .withQueueUrl(queueUrl) - .withEntries(tables.stream() - .map(table -> new SendMessageBatchRequestEntry() - .withMessageDeduplicationId(UUID.randomUUID().toString()) - .withId(table.getTableUniqueId()) - .withMessageGroupId(table.getTableUniqueId()) - .withMessageBody(table.getTableUniqueId())) - .collect(toUnmodifiableList()))); - } - } diff --git a/java/metrics/pom.xml b/java/metrics/pom.xml index e76c4033de..93ad47817a 100644 --- a/java/metrics/pom.xml +++ b/java/metrics/pom.xml @@ -58,6 +58,11 @@ statestore ${project.parent.version} + + sleeper + common-invoke-tables + ${project.parent.version} + sleeper diff --git a/java/metrics/src/main/java/sleeper/metrics/TableMetricsTriggerLambda.java b/java/metrics/src/main/java/sleeper/metrics/TableMetricsTriggerLambda.java index 287ba3876a..f020901b14 100644 --- a/java/metrics/src/main/java/sleeper/metrics/TableMetricsTriggerLambda.java +++ b/java/metrics/src/main/java/sleeper/metrics/TableMetricsTriggerLambda.java @@ -24,8 +24,6 @@ import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; -import com.amazonaws.services.sqs.model.SendMessageBatchRequest; -import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,15 +31,11 @@ import sleeper.configuration.properties.instance.InstanceProperties; import sleeper.configuration.table.index.DynamoDBTableIndex; import sleeper.core.table.TableIndex; -import sleeper.core.table.TableStatus; import sleeper.core.util.LoggedDuration; -import sleeper.core.util.SplitIntoBatches; +import sleeper.invoke.tables.InvokeForTables; import java.time.Instant; -import java.util.List; -import java.util.UUID; -import static java.util.stream.Collectors.toUnmodifiableList; import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.CONFIG_BUCKET; import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.TABLE_METRICS_QUEUE_URL; import static sleeper.configuration.properties.instance.CommonProperty.METRICS_FOR_OFFLINE_TABLES; @@ -70,24 +64,11 @@ public Void handleRequest(ScheduledEvent event, Context context) { TableIndex tableIndex = new DynamoDBTableIndex(instanceProperties, dynamoClient); String queueUrl = instanceProperties.get(TABLE_METRICS_QUEUE_URL); boolean offlineEnabled = instanceProperties.getBoolean(METRICS_FOR_OFFLINE_TABLES); - SplitIntoBatches.reusingListOfSize(10, - offlineEnabled ? tableIndex.streamAllTables() : tableIndex.streamOnlineTables(), - tables -> sendMessageBatch(tables, queueUrl)); + InvokeForTables.sendOneMessagePerTable(sqsClient, queueUrl, + offlineEnabled ? tableIndex.streamAllTables() : tableIndex.streamOnlineTables()); Instant finishTime = Instant.now(); LOGGER.info("Lambda finished at {} (ran for {})", finishTime, LoggedDuration.withFullOutput(startTime, finishTime)); return null; } - - private void sendMessageBatch(List tables, String queueUrl) { - sqsClient.sendMessageBatch(new SendMessageBatchRequest() - .withQueueUrl(queueUrl) - .withEntries(tables.stream() - .map(table -> new SendMessageBatchRequestEntry() - .withMessageDeduplicationId(UUID.randomUUID().toString()) - .withId(table.getTableUniqueId()) - .withMessageGroupId(table.getTableUniqueId()) - .withMessageBody(table.getTableUniqueId())) - .collect(toUnmodifiableList()))); - } } diff --git a/java/splitter/splitter-lambda/pom.xml b/java/splitter/splitter-lambda/pom.xml index 8f668ccb77..167dd43ae2 100644 --- a/java/splitter/splitter-lambda/pom.xml +++ b/java/splitter/splitter-lambda/pom.xml @@ -62,6 +62,11 @@ splitter-core ${project.parent.version} + + sleeper + common-invoke-tables + ${project.parent.version} + org.testcontainers diff --git a/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/FindPartitionsToSplitTriggerLambda.java b/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/FindPartitionsToSplitTriggerLambda.java index 56ea19c00a..f7fa22b1c9 100644 --- a/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/FindPartitionsToSplitTriggerLambda.java +++ b/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/FindPartitionsToSplitTriggerLambda.java @@ -24,8 +24,6 @@ import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; -import com.amazonaws.services.sqs.model.SendMessageBatchRequest; -import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,15 +31,11 @@ import sleeper.configuration.properties.instance.InstanceProperties; import sleeper.configuration.table.index.DynamoDBTableIndex; import sleeper.core.table.TableIndex; -import sleeper.core.table.TableStatus; import sleeper.core.util.LoggedDuration; -import sleeper.core.util.SplitIntoBatches; +import sleeper.invoke.tables.InvokeForTables; import java.time.Instant; -import java.util.List; -import java.util.UUID; -import static java.util.stream.Collectors.toUnmodifiableList; import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.CONFIG_BUCKET; import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.FIND_PARTITIONS_TO_SPLIT_QUEUE_URL; @@ -71,25 +65,10 @@ public Void handleRequest(ScheduledEvent event, Context context) { String queueUrl = instanceProperties.get(FIND_PARTITIONS_TO_SPLIT_QUEUE_URL); TableIndex tableIndex = new DynamoDBTableIndex(instanceProperties, dynamoClient); - SplitIntoBatches.reusingListOfSize(10, - tableIndex.streamOnlineTables(), - tables -> sendMessageBatch(tables, queueUrl)); + InvokeForTables.sendOneMessagePerTable(sqsClient, queueUrl, tableIndex.streamOnlineTables()); Instant finishTime = Instant.now(); LOGGER.info("Lambda finished at {} (ran for {})", finishTime, LoggedDuration.withFullOutput(startTime, finishTime)); return null; } - - private void sendMessageBatch(List tables, String queueUrl) { - sqsClient.sendMessageBatch(new SendMessageBatchRequest() - .withQueueUrl(queueUrl) - .withEntries(tables.stream() - .map(table -> new SendMessageBatchRequestEntry() - .withMessageDeduplicationId(UUID.randomUUID().toString()) - .withId(table.getTableUniqueId()) - .withMessageGroupId(table.getTableUniqueId()) - .withMessageBody(table.getTableUniqueId())) - .collect(toUnmodifiableList()))); - } - }