diff --git a/.github/config/chunks.yaml b/.github/config/chunks.yaml
index d29a863959..d654ec72da 100644
--- a/.github/config/chunks.yaml
+++ b/.github/config/chunks.yaml
@@ -18,7 +18,7 @@ chunks:
common:
name: Common
workflow: chunk-common.yaml
- modules: [ core, configuration, sketches, parquet, common/common-job, common/common-task, build, common/dynamodb-tools ]
+ modules: [ core, configuration, sketches, parquet, common/common-job, common/common-task, common/common-invoke-tables, build, common/dynamodb-tools ]
compaction:
name: Compaction
workflow: chunk-compaction.yaml
diff --git a/.github/workflows/chunk-cdk.yaml b/.github/workflows/chunk-cdk.yaml
index 867515d3b7..07ffa5111d 100644
--- a/.github/workflows/chunk-cdk.yaml
+++ b/.github/workflows/chunk-cdk.yaml
@@ -14,10 +14,10 @@ on:
- 'java/cdk-environment/**'
- 'java/system-test/system-test-cdk/**'
- 'java/system-test/system-test-configuration/**'
+ - 'java/system-test/system-test-data-generation/**'
- 'java/system-test/system-test-dsl/**'
- 'java/system-test/system-test-drivers/**'
- 'java/system-test/system-test-suite/**'
- - 'java/system-test/system-test-data-generation/**'
- 'java/bulk-import/bulk-import-starter/**'
- 'java/clients/**'
- 'java/ingest/ingest-batcher-submitter/**'
@@ -34,6 +34,7 @@ on:
- 'java/ingest/ingest-runner/**'
- 'java/compaction/compaction-status-store/**'
- 'java/compaction/compaction-job-creation/**'
+ - 'java/common/common-invoke-tables/**'
- 'java/common/common-job/**'
- 'java/ingest/ingest-status-store/**'
- 'java/sketches/**'
diff --git a/.github/workflows/chunk-clients.yaml b/.github/workflows/chunk-clients.yaml
index bb2b4f58ae..b8a4f335c0 100644
--- a/.github/workflows/chunk-clients.yaml
+++ b/.github/workflows/chunk-clients.yaml
@@ -13,6 +13,7 @@ on:
- 'java/compaction/compaction-status-store/**'
- 'java/splitter/splitter-core/**'
- 'java/ingest/ingest-batcher-store/**'
+ - 'java/common/common-invoke-tables/**'
- 'java/common/common-task/**'
- 'java/ingest/ingest-batcher-core/**'
- 'java/compaction/compaction-core/**'
diff --git a/.github/workflows/chunk-common.yaml b/.github/workflows/chunk-common.yaml
index 3b4eaf3253..7c3203fd08 100644
--- a/.github/workflows/chunk-common.yaml
+++ b/.github/workflows/chunk-common.yaml
@@ -15,6 +15,7 @@ on:
- 'java/parquet/**'
- 'java/common/common-job/**'
- 'java/common/common-task/**'
+ - 'java/common/common-invoke-tables/**'
- 'java/build/**'
- 'java/common/dynamodb-tools/**'
diff --git a/.github/workflows/chunk-compaction.yaml b/.github/workflows/chunk-compaction.yaml
index 62d02a2977..4ebb2b081b 100644
--- a/.github/workflows/chunk-compaction.yaml
+++ b/.github/workflows/chunk-compaction.yaml
@@ -19,6 +19,7 @@ on:
- 'java/splitter/splitter-core/**'
- 'java/splitter/splitter-lambda/**'
- 'java/common/common-task/**'
+ - 'java/common/common-invoke-tables/**'
- 'java/ingest/ingest-runner/**'
- 'java/common/common-job/**'
- 'java/ingest/ingest-status-store/**'
diff --git a/.github/workflows/chunk-data.yaml b/.github/workflows/chunk-data.yaml
index fe15b910c1..0cfdf66537 100644
--- a/.github/workflows/chunk-data.yaml
+++ b/.github/workflows/chunk-data.yaml
@@ -8,12 +8,13 @@ on:
- 'code-style/checkstyle*.xml'
- 'code-style/spotbugs*.xml'
- 'java/pom.xml'
- - 'java/metrics/**'
- 'java/garbage-collector/**'
+ - 'java/metrics/**'
+ - 'java/common/common-invoke-tables/**'
- 'java/statestore/**'
- - 'java/common/dynamodb-tools/**'
- 'java/parquet/**'
- 'java/configuration/**'
+ - 'java/common/dynamodb-tools/**'
- 'java/core/**'
jobs:
diff --git a/java/clients/pom.xml b/java/clients/pom.xml
index 1581690842..ce9125cfd0 100644
--- a/java/clients/pom.xml
+++ b/java/clients/pom.xml
@@ -136,6 +136,11 @@
ingest-batcher-store
${project.parent.version}
+
+ sleeper
+ common-invoke-tables
+ ${project.parent.version}
+
sleeper
diff --git a/java/clients/src/main/java/sleeper/clients/status/update/TriggerGarbageCollectionClient.java b/java/clients/src/main/java/sleeper/clients/status/update/TriggerGarbageCollectionClient.java
index b69a9011ec..a9fcbb7115 100644
--- a/java/clients/src/main/java/sleeper/clients/status/update/TriggerGarbageCollectionClient.java
+++ b/java/clients/src/main/java/sleeper/clients/status/update/TriggerGarbageCollectionClient.java
@@ -24,11 +24,8 @@
import sleeper.configuration.properties.instance.InstanceProperties;
import sleeper.configuration.table.index.DynamoDBTableIndex;
-import sleeper.core.table.InvokeForTableRequest;
-import sleeper.core.table.InvokeForTableRequestSerDe;
import sleeper.core.table.TableIndex;
-import sleeper.core.table.TableNotFoundException;
-import sleeper.core.table.TableStatus;
+import sleeper.invoke.tables.InvokeForTables;
import java.util.List;
import java.util.stream.Stream;
@@ -58,14 +55,8 @@ public static void main(String[] args) {
InstanceProperties instanceProperties = new InstanceProperties();
instanceProperties.loadFromS3GivenInstanceId(s3Client, instanceId);
TableIndex tableIndex = new DynamoDBTableIndex(instanceProperties, dynamoClient);
- List tables = tableNames.stream()
- .map(name -> tableIndex.getTableByName(name)
- .orElseThrow(() -> TableNotFoundException.withTableName(name)))
- .collect(toUnmodifiableList());
String queueUrl = instanceProperties.get(GARBAGE_COLLECTOR_QUEUE_URL);
- InvokeForTableRequestSerDe serDe = new InvokeForTableRequestSerDe();
- InvokeForTableRequest.forTables(tables.stream(), tables.size(),
- request -> sqsClient.sendMessage(queueUrl, serDe.toJson(request)));
+ InvokeForTables.sendOneMessagePerTableByName(sqsClient, queueUrl, tableIndex, tableNames);
} finally {
s3Client.shutdown();
dynamoClient.shutdown();
diff --git a/java/common/common-invoke-tables/pom.xml b/java/common/common-invoke-tables/pom.xml
new file mode 100644
index 0000000000..24db271ea9
--- /dev/null
+++ b/java/common/common-invoke-tables/pom.xml
@@ -0,0 +1,75 @@
+
+
+
+
+
+ common
+ sleeper
+ 0.23.0-SNAPSHOT
+
+
+ 4.0.0
+
+ common-invoke-tables
+
+
+
+ com.amazonaws
+ aws-java-sdk-sqs
+ ${aws-java-sdk.version}
+
+
+
+ sleeper
+ core
+ ${project.parent.version}
+
+
+
+ sleeper
+ core
+ ${project.parent.version}
+ test-jar
+ test
+
+
+ sleeper
+ configuration
+ ${project.parent.version}
+ test-jar
+ test
+
+
+ org.testcontainers
+ testcontainers
+ test
+
+
+ org.testcontainers
+ localstack
+ test
+
+
+ org.testcontainers
+ junit-jupiter
+ test
+
+
+
+
\ No newline at end of file
diff --git a/java/common/common-invoke-tables/src/main/java/sleeper/invoke/tables/InvokeForTables.java b/java/common/common-invoke-tables/src/main/java/sleeper/invoke/tables/InvokeForTables.java
new file mode 100644
index 0000000000..70beac9f10
--- /dev/null
+++ b/java/common/common-invoke-tables/src/main/java/sleeper/invoke/tables/InvokeForTables.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2022-2024 Crown Copyright
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package sleeper.invoke.tables;
+
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
+import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
+
+import sleeper.core.table.TableIndex;
+import sleeper.core.table.TableNotFoundException;
+import sleeper.core.table.TableStatus;
+import sleeper.core.util.SplitIntoBatches;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Stream;
+
+import static java.util.stream.Collectors.toUnmodifiableList;
+
+public class InvokeForTables {
+
+ private InvokeForTables() {
+ }
+
+ public static void sendOneMessagePerTable(AmazonSQS sqsClient, String queueUrl, Stream tables) {
+ // Limit to stay under the maximum size for an SQS sendMessageBatch call.
+ SplitIntoBatches.reusingListOfSize(10, tables,
+ batch -> sendMessageBatch(sqsClient, queueUrl, batch));
+ }
+
+ public static void sendOneMessagePerTableByName(
+ AmazonSQS sqsClient, String queueUrl, TableIndex tableIndex, List tableNames) {
+ List tables = tableNames.stream().map(name -> tableIndex.getTableByName(name)
+ .orElseThrow(() -> TableNotFoundException.withTableName(name)))
+ .collect(toUnmodifiableList());
+ sendOneMessagePerTable(sqsClient, queueUrl, tables.stream());
+ }
+
+ private static void sendMessageBatch(AmazonSQS sqsClient, String queueUrl, List tablesBatch) {
+ sqsClient.sendMessageBatch(new SendMessageBatchRequest()
+ .withQueueUrl(queueUrl)
+ .withEntries(tablesBatch.stream()
+ .map(table -> new SendMessageBatchRequestEntry()
+ .withMessageDeduplicationId(UUID.randomUUID().toString())
+ .withId(table.getTableUniqueId())
+ .withMessageGroupId(table.getTableUniqueId())
+ .withMessageBody(table.getTableUniqueId()))
+ .collect(toUnmodifiableList())));
+ }
+}
diff --git a/java/common/common-invoke-tables/src/test/java/sleeper/invoke/tables/InvokeForTablesIT.java b/java/common/common-invoke-tables/src/test/java/sleeper/invoke/tables/InvokeForTablesIT.java
new file mode 100644
index 0000000000..e9c27584f0
--- /dev/null
+++ b/java/common/common-invoke-tables/src/test/java/sleeper/invoke/tables/InvokeForTablesIT.java
@@ -0,0 +1,154 @@
+/*
+ * Copyright 2022-2024 Crown Copyright
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package sleeper.invoke.tables;
+
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
+import com.amazonaws.services.sqs.model.CreateQueueRequest;
+import com.amazonaws.services.sqs.model.CreateQueueResult;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import com.amazonaws.services.sqs.model.ReceiveMessageResult;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.localstack.LocalStackContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import sleeper.core.CommonTestConstants;
+import sleeper.core.table.InMemoryTableIndex;
+import sleeper.core.table.TableIndex;
+import sleeper.core.table.TableNotFoundException;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static java.util.stream.Collectors.toUnmodifiableList;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static sleeper.configuration.testutils.LocalStackAwsV1ClientHelper.buildAwsV1Client;
+import static sleeper.core.table.TableStatusTestHelper.uniqueIdAndName;
+
+@Testcontainers
+public class InvokeForTablesIT {
+
+ @Container
+ public static LocalStackContainer localStackContainer = new LocalStackContainer(DockerImageName.parse(CommonTestConstants.LOCALSTACK_DOCKER_IMAGE))
+ .withServices(LocalStackContainer.Service.SQS);
+
+ private final AmazonSQS sqsClient = buildAwsV1Client(localStackContainer, LocalStackContainer.Service.SQS, AmazonSQSClientBuilder.standard());
+
+ @Test
+ void shouldSendOneMessage() {
+ // Given
+ String queueUrl = createFifoQueueGetUrl();
+
+ // When
+ InvokeForTables.sendOneMessagePerTable(sqsClient, queueUrl, Stream.of(
+ uniqueIdAndName("table-id", "table-name")));
+
+ // Then
+ assertThat(receiveTableIdMessages(queueUrl, 2))
+ .containsExactly("table-id");
+ }
+
+ @Test
+ void shouldSendMoreMessagesThanFitInAnSqsSendMessageBatch() {
+ // Given a FIFO queue
+ String queueUrl = createFifoQueueGetUrl();
+
+ // When we send more than the SQS hard limit of 10 messages to send in a single batch
+ InvokeForTables.sendOneMessagePerTable(sqsClient, queueUrl,
+ IntStream.rangeClosed(1, 11)
+ .mapToObj(i -> uniqueIdAndName("table-id-" + i, "table-name-" + i)));
+
+ // Then we can receive those messages
+ assertThat(receiveTableIdMessages(queueUrl, 10)).containsExactly(
+ "table-id-1", "table-id-2", "table-id-3", "table-id-4", "table-id-5",
+ "table-id-6", "table-id-7", "table-id-8", "table-id-9", "table-id-10");
+ assertThat(receiveTableIdMessages(queueUrl, 10)).containsExactly(
+ "table-id-11");
+ }
+
+ @Test
+ void shouldLookUpTableByName() {
+ // Given
+ String queueUrl = createFifoQueueGetUrl();
+ TableIndex tableIndex = new InMemoryTableIndex();
+ tableIndex.create(uniqueIdAndName("table-id", "table-name"));
+
+ // When
+ InvokeForTables.sendOneMessagePerTableByName(sqsClient, queueUrl, tableIndex, List.of("table-name"));
+
+ // Then
+ assertThat(receiveTableIdMessages(queueUrl, 2))
+ .containsExactly("table-id");
+ }
+
+ @Test
+ void shouldFailLookUpTableByName() {
+ // Given
+ String queueUrl = createFifoQueueGetUrl();
+ TableIndex tableIndex = new InMemoryTableIndex();
+
+ // When / Then
+ assertThatThrownBy(() -> InvokeForTables.sendOneMessagePerTableByName(
+ sqsClient, queueUrl, tableIndex, List.of("missing-table")))
+ .isInstanceOf(TableNotFoundException.class);
+ assertThat(receiveTableIdMessages(queueUrl, 1))
+ .isEmpty();
+ }
+
+ @Test
+ void shouldFailLookUpTableByNameOnSecondPage() {
+ // Given
+ String queueUrl = createFifoQueueGetUrl();
+ TableIndex tableIndex = new InMemoryTableIndex();
+ IntStream.rangeClosed(1, 11)
+ .mapToObj(i -> uniqueIdAndName("table-id-" + i, "table-name-" + i))
+ .forEach(tableIndex::create);
+
+ // When / Then
+ assertThatThrownBy(() -> InvokeForTables.sendOneMessagePerTableByName(sqsClient, queueUrl, tableIndex,
+ IntStream.rangeClosed(1, 12)
+ .mapToObj(i -> "table-name-" + i)
+ .collect(toUnmodifiableList())))
+ .isInstanceOf(TableNotFoundException.class);
+ assertThat(receiveTableIdMessages(queueUrl, 10))
+ .isEmpty();
+ }
+
+ private String createFifoQueueGetUrl() {
+ CreateQueueResult result = sqsClient.createQueue(new CreateQueueRequest()
+ .withQueueName(UUID.randomUUID().toString() + ".fifo")
+ .withAttributes(Map.of("FifoQueue", "true")));
+ return result.getQueueUrl();
+ }
+
+ private List receiveTableIdMessages(String queueUrl, int maxMessages) {
+ ReceiveMessageResult result = sqsClient.receiveMessage(
+ new ReceiveMessageRequest(queueUrl)
+ .withMaxNumberOfMessages(maxMessages)
+ .withWaitTimeSeconds(0));
+ return result.getMessages().stream()
+ .map(Message::getBody)
+ .collect(toUnmodifiableList());
+ }
+
+}
diff --git a/java/common/pom.xml b/java/common/pom.xml
index 8409bfdf62..e04ff6ce0b 100644
--- a/java/common/pom.xml
+++ b/java/common/pom.xml
@@ -29,6 +29,7 @@
common-job
common-task
+ common-invoke-tables
dynamodb-tools
diff --git a/java/compaction/compaction-job-creation-lambda/pom.xml b/java/compaction/compaction-job-creation-lambda/pom.xml
index e656f2c57e..b778867611 100644
--- a/java/compaction/compaction-job-creation-lambda/pom.xml
+++ b/java/compaction/compaction-job-creation-lambda/pom.xml
@@ -67,6 +67,11 @@
configuration
${project.parent.version}
+
+ sleeper
+ common-invoke-tables
+ ${project.parent.version}
+
com.amazonaws
diff --git a/java/compaction/compaction-job-creation-lambda/src/main/java/sleeper/compaction/job/creation/lambda/CreateCompactionJobsTriggerLambda.java b/java/compaction/compaction-job-creation-lambda/src/main/java/sleeper/compaction/job/creation/lambda/CreateCompactionJobsTriggerLambda.java
index 1fc3992fa2..4f7bdf8ce3 100644
--- a/java/compaction/compaction-job-creation-lambda/src/main/java/sleeper/compaction/job/creation/lambda/CreateCompactionJobsTriggerLambda.java
+++ b/java/compaction/compaction-job-creation-lambda/src/main/java/sleeper/compaction/job/creation/lambda/CreateCompactionJobsTriggerLambda.java
@@ -24,8 +24,6 @@
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
-import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
-import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,15 +31,11 @@
import sleeper.configuration.properties.instance.InstanceProperties;
import sleeper.configuration.table.index.DynamoDBTableIndex;
import sleeper.core.table.TableIndex;
-import sleeper.core.table.TableStatus;
import sleeper.core.util.LoggedDuration;
-import sleeper.core.util.SplitIntoBatches;
+import sleeper.invoke.tables.InvokeForTables;
import java.time.Instant;
-import java.util.List;
-import java.util.UUID;
-import static java.util.stream.Collectors.toUnmodifiableList;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.COMPACTION_JOB_CREATION_QUEUE_URL;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.CONFIG_BUCKET;
@@ -72,25 +66,10 @@ public Void handleRequest(ScheduledEvent event, Context context) {
String queueUrl = instanceProperties.get(COMPACTION_JOB_CREATION_QUEUE_URL);
TableIndex tableIndex = new DynamoDBTableIndex(instanceProperties, dynamoClient);
- SplitIntoBatches.reusingListOfSize(10,
- tableIndex.streamOnlineTables(),
- tables -> sendMessageBatch(tables, queueUrl));
+ InvokeForTables.sendOneMessagePerTable(sqsClient, queueUrl, tableIndex.streamOnlineTables());
Instant finishTime = Instant.now();
LOGGER.info("Lambda finished at {} (ran for {})", finishTime, LoggedDuration.withFullOutput(startTime, finishTime));
return null;
}
-
- private void sendMessageBatch(List tables, String queueUrl) {
- sqsClient.sendMessageBatch(new SendMessageBatchRequest()
- .withQueueUrl(queueUrl)
- .withEntries(tables.stream()
- .map(table -> new SendMessageBatchRequestEntry()
- .withMessageDeduplicationId(UUID.randomUUID().toString())
- .withId(table.getTableUniqueId())
- .withMessageGroupId(table.getTableUniqueId())
- .withMessageBody(table.getTableUniqueId()))
- .collect(toUnmodifiableList())));
- }
-
}
diff --git a/java/core/src/main/java/sleeper/core/table/InvokeForTableRequest.java b/java/core/src/main/java/sleeper/core/table/InvokeForTableRequest.java
deleted file mode 100644
index 2689f33f89..0000000000
--- a/java/core/src/main/java/sleeper/core/table/InvokeForTableRequest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Copyright 2022-2024 Crown Copyright
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package sleeper.core.table;
-
-import sleeper.core.util.SplitIntoBatches;
-
-import java.util.List;
-import java.util.Objects;
-import java.util.function.Consumer;
-import java.util.stream.Stream;
-
-/**
- * Model for an SQS message to invoke some operation for a batch of tables. Use {@link InvokeForTableRequestSerDe} to
- * build SQS messages from objects of this class.
- */
-public class InvokeForTableRequest {
-
- private final List tableIds;
-
- public InvokeForTableRequest(List tableIds) {
- this.tableIds = tableIds;
- }
-
- public List getTableIds() {
- return tableIds;
- }
-
- /**
- * Creates batches of tables, and sends a request for each batch.
- *
- * @param tables a stream of tables to batch
- * @param batchSize the batch size
- * @param sendRequest a function to send a request
- */
- public static void forTables(Stream tables, int batchSize, Consumer sendRequest) {
- SplitIntoBatches.reusingListOfSize(batchSize,
- tables.map(TableStatus::getTableUniqueId),
- tableIds -> sendRequest.accept(new InvokeForTableRequest(tableIds)));
- }
-
- /**
- * Loads tables from the table index, creates batches of tables, and sends a request for each batch.
- *
- * @param offlineEnabled whether to include offline tables when creating batches
- * @param tableIndex the table index
- * @param batchSize the batch size
- * @param sendRequest a function to send a request
- */
- public static void forTablesWithOfflineEnabled(
- boolean offlineEnabled, TableIndex tableIndex, int batchSize, Consumer sendRequest) {
- forTables(
- offlineEnabled ? tableIndex.streamAllTables() : tableIndex.streamOnlineTables(),
- batchSize, sendRequest);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(tableIds);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (!(obj instanceof InvokeForTableRequest)) {
- return false;
- }
- InvokeForTableRequest other = (InvokeForTableRequest) obj;
- return Objects.equals(tableIds, other.tableIds);
- }
-
- @Override
- public String toString() {
- return "InvokeForTableRequest{tableIds=" + tableIds + "}";
- }
-
-}
diff --git a/java/core/src/main/java/sleeper/core/table/InvokeForTableRequestSerDe.java b/java/core/src/main/java/sleeper/core/table/InvokeForTableRequestSerDe.java
deleted file mode 100644
index e785734731..0000000000
--- a/java/core/src/main/java/sleeper/core/table/InvokeForTableRequestSerDe.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright 2022-2024 Crown Copyright
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package sleeper.core.table;
-
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-
-/**
- * Serialisation and deserialisation of SQS messages to invoke some operation for a batch of tables.
- */
-public class InvokeForTableRequestSerDe {
-
- private final Gson gson = new GsonBuilder().create();
-
- /**
- * Serialises a request for a batch of tables to a JSON string.
- *
- * @param request the request
- * @return the JSON string
- */
- public String toJson(InvokeForTableRequest request) {
- return gson.toJson(request);
- }
-
- /**
- * Deserialises a request for a batch of tables from a JSON string.
- *
- * @param json the JSON string
- * @return the request
- */
- public InvokeForTableRequest fromJson(String json) {
- return gson.fromJson(json, InvokeForTableRequest.class);
- }
-}
diff --git a/java/core/src/test/java/sleeper/core/table/InvokeForTableRequestTest.java b/java/core/src/test/java/sleeper/core/table/InvokeForTableRequestTest.java
deleted file mode 100644
index f4eb193e3e..0000000000
--- a/java/core/src/test/java/sleeper/core/table/InvokeForTableRequestTest.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Copyright 2022-2024 Crown Copyright
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package sleeper.core.table;
-
-import org.junit.jupiter.api.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Stream;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-public class InvokeForTableRequestTest {
-
- private final InvokeForTableRequestSerDe serDe = new InvokeForTableRequestSerDe();
-
- @Test
- void shouldSendRequestForTwoTables() {
- List sent = new ArrayList<>();
- InvokeForTableRequest.forTables(
- Stream.of(table("table-1"), table("table-2")),
- 1, request -> sent.add(serDe.toJson(request)));
- assertThat(sent).extracting(serDe::fromJson).containsExactly(
- new InvokeForTableRequest(List.of("table-1")),
- new InvokeForTableRequest(List.of("table-2")));
- }
-
- @Test
- void shouldSendBatchesOf2() {
- List sent = new ArrayList<>();
- InvokeForTableRequest.forTables(
- Stream.of(table("table-1"), table("table-2")),
- 2, request -> sent.add(serDe.toJson(request)));
- assertThat(sent).extracting(serDe::fromJson).containsExactly(
- new InvokeForTableRequest(List.of("table-1", "table-2")));
- }
-
- @Test
- void shouldSendRequestForOnlyOnlineTable() {
- // Given
- TableIndex tableIndex = new InMemoryTableIndex();
- tableIndex.create(table("offline-table").takeOffline());
- tableIndex.create(table("online-table"));
-
- // When
- List sent = new ArrayList<>();
- InvokeForTableRequest.forTablesWithOfflineEnabled(false, tableIndex,
- 1, request -> sent.add(serDe.toJson(request)));
-
- // Then
- assertThat(sent).extracting(serDe::fromJson).containsExactly(
- new InvokeForTableRequest(List.of("online-table")));
- }
-
- @Test
- void shouldSendRequestForAllTablesWhenOfflineEnabled() {
- // Given
- TableIndex tableIndex = new InMemoryTableIndex();
- tableIndex.create(table("offline-table").takeOffline());
- tableIndex.create(table("online-table"));
-
- // When
- List sent = new ArrayList<>();
- InvokeForTableRequest.forTablesWithOfflineEnabled(true, tableIndex,
- 1, request -> sent.add(serDe.toJson(request)));
-
- // Then
- assertThat(sent).extracting(serDe::fromJson).containsExactly(
- new InvokeForTableRequest(List.of("offline-table")),
- new InvokeForTableRequest(List.of("online-table")));
- }
-
- private TableStatus table(String tableName) {
- return TableStatusTestHelper.uniqueIdAndName(tableName, tableName);
- }
-}
diff --git a/java/garbage-collector/pom.xml b/java/garbage-collector/pom.xml
index f2b2ec2c14..09dc6e14e0 100644
--- a/java/garbage-collector/pom.xml
+++ b/java/garbage-collector/pom.xml
@@ -84,6 +84,11 @@
statestore
${project.parent.version}
+
+ sleeper
+ common-invoke-tables
+ ${project.parent.version}
+
sleeper
diff --git a/java/garbage-collector/src/main/java/sleeper/garbagecollector/GarbageCollectorTriggerLambda.java b/java/garbage-collector/src/main/java/sleeper/garbagecollector/GarbageCollectorTriggerLambda.java
index be27c6e001..6d3354bc14 100644
--- a/java/garbage-collector/src/main/java/sleeper/garbagecollector/GarbageCollectorTriggerLambda.java
+++ b/java/garbage-collector/src/main/java/sleeper/garbagecollector/GarbageCollectorTriggerLambda.java
@@ -24,8 +24,6 @@
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
-import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
-import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,15 +31,11 @@
import sleeper.configuration.properties.instance.InstanceProperties;
import sleeper.configuration.table.index.DynamoDBTableIndex;
import sleeper.core.table.TableIndex;
-import sleeper.core.table.TableStatus;
import sleeper.core.util.LoggedDuration;
-import sleeper.core.util.SplitIntoBatches;
+import sleeper.invoke.tables.InvokeForTables;
import java.time.Instant;
-import java.util.List;
-import java.util.UUID;
-import static java.util.stream.Collectors.toUnmodifiableList;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.CONFIG_BUCKET;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.GARBAGE_COLLECTOR_QUEUE_URL;
import static sleeper.configuration.properties.instance.GarbageCollectionProperty.GARBAGE_COLLECT_OFFLINE_TABLES;
@@ -73,25 +67,11 @@ public Void handleRequest(ScheduledEvent event, Context context) {
String queueUrl = instanceProperties.get(GARBAGE_COLLECTOR_QUEUE_URL);
boolean offlineEnabled = instanceProperties.getBoolean(GARBAGE_COLLECT_OFFLINE_TABLES);
TableIndex tableIndex = new DynamoDBTableIndex(instanceProperties, dynamoClient);
- SplitIntoBatches.reusingListOfSize(10,
- offlineEnabled ? tableIndex.streamAllTables() : tableIndex.streamOnlineTables(),
- tables -> sendMessageBatch(tables, queueUrl));
+ InvokeForTables.sendOneMessagePerTable(sqsClient, queueUrl,
+ offlineEnabled ? tableIndex.streamAllTables() : tableIndex.streamOnlineTables());
Instant finishTime = Instant.now();
LOGGER.info("Lambda finished at {} (ran for {})", finishTime, LoggedDuration.withFullOutput(startTime, finishTime));
return null;
}
-
- private void sendMessageBatch(List tables, String queueUrl) {
- sqsClient.sendMessageBatch(new SendMessageBatchRequest()
- .withQueueUrl(queueUrl)
- .withEntries(tables.stream()
- .map(table -> new SendMessageBatchRequestEntry()
- .withMessageDeduplicationId(UUID.randomUUID().toString())
- .withId(table.getTableUniqueId())
- .withMessageGroupId(table.getTableUniqueId())
- .withMessageBody(table.getTableUniqueId()))
- .collect(toUnmodifiableList())));
- }
-
}
diff --git a/java/metrics/pom.xml b/java/metrics/pom.xml
index e76c4033de..93ad47817a 100644
--- a/java/metrics/pom.xml
+++ b/java/metrics/pom.xml
@@ -58,6 +58,11 @@
statestore
${project.parent.version}
+
+ sleeper
+ common-invoke-tables
+ ${project.parent.version}
+
sleeper
diff --git a/java/metrics/src/main/java/sleeper/metrics/TableMetricsTriggerLambda.java b/java/metrics/src/main/java/sleeper/metrics/TableMetricsTriggerLambda.java
index 287ba3876a..f020901b14 100644
--- a/java/metrics/src/main/java/sleeper/metrics/TableMetricsTriggerLambda.java
+++ b/java/metrics/src/main/java/sleeper/metrics/TableMetricsTriggerLambda.java
@@ -24,8 +24,6 @@
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
-import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
-import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,15 +31,11 @@
import sleeper.configuration.properties.instance.InstanceProperties;
import sleeper.configuration.table.index.DynamoDBTableIndex;
import sleeper.core.table.TableIndex;
-import sleeper.core.table.TableStatus;
import sleeper.core.util.LoggedDuration;
-import sleeper.core.util.SplitIntoBatches;
+import sleeper.invoke.tables.InvokeForTables;
import java.time.Instant;
-import java.util.List;
-import java.util.UUID;
-import static java.util.stream.Collectors.toUnmodifiableList;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.CONFIG_BUCKET;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.TABLE_METRICS_QUEUE_URL;
import static sleeper.configuration.properties.instance.CommonProperty.METRICS_FOR_OFFLINE_TABLES;
@@ -70,24 +64,11 @@ public Void handleRequest(ScheduledEvent event, Context context) {
TableIndex tableIndex = new DynamoDBTableIndex(instanceProperties, dynamoClient);
String queueUrl = instanceProperties.get(TABLE_METRICS_QUEUE_URL);
boolean offlineEnabled = instanceProperties.getBoolean(METRICS_FOR_OFFLINE_TABLES);
- SplitIntoBatches.reusingListOfSize(10,
- offlineEnabled ? tableIndex.streamAllTables() : tableIndex.streamOnlineTables(),
- tables -> sendMessageBatch(tables, queueUrl));
+ InvokeForTables.sendOneMessagePerTable(sqsClient, queueUrl,
+ offlineEnabled ? tableIndex.streamAllTables() : tableIndex.streamOnlineTables());
Instant finishTime = Instant.now();
LOGGER.info("Lambda finished at {} (ran for {})", finishTime, LoggedDuration.withFullOutput(startTime, finishTime));
return null;
}
-
- private void sendMessageBatch(List tables, String queueUrl) {
- sqsClient.sendMessageBatch(new SendMessageBatchRequest()
- .withQueueUrl(queueUrl)
- .withEntries(tables.stream()
- .map(table -> new SendMessageBatchRequestEntry()
- .withMessageDeduplicationId(UUID.randomUUID().toString())
- .withId(table.getTableUniqueId())
- .withMessageGroupId(table.getTableUniqueId())
- .withMessageBody(table.getTableUniqueId()))
- .collect(toUnmodifiableList())));
- }
}
diff --git a/java/splitter/splitter-lambda/pom.xml b/java/splitter/splitter-lambda/pom.xml
index 8f668ccb77..167dd43ae2 100644
--- a/java/splitter/splitter-lambda/pom.xml
+++ b/java/splitter/splitter-lambda/pom.xml
@@ -62,6 +62,11 @@
splitter-core
${project.parent.version}
+
+ sleeper
+ common-invoke-tables
+ ${project.parent.version}
+
org.testcontainers
diff --git a/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/FindPartitionsToSplitTriggerLambda.java b/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/FindPartitionsToSplitTriggerLambda.java
index 56ea19c00a..f7fa22b1c9 100644
--- a/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/FindPartitionsToSplitTriggerLambda.java
+++ b/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/FindPartitionsToSplitTriggerLambda.java
@@ -24,8 +24,6 @@
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
-import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
-import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,15 +31,11 @@
import sleeper.configuration.properties.instance.InstanceProperties;
import sleeper.configuration.table.index.DynamoDBTableIndex;
import sleeper.core.table.TableIndex;
-import sleeper.core.table.TableStatus;
import sleeper.core.util.LoggedDuration;
-import sleeper.core.util.SplitIntoBatches;
+import sleeper.invoke.tables.InvokeForTables;
import java.time.Instant;
-import java.util.List;
-import java.util.UUID;
-import static java.util.stream.Collectors.toUnmodifiableList;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.CONFIG_BUCKET;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.FIND_PARTITIONS_TO_SPLIT_QUEUE_URL;
@@ -71,25 +65,10 @@ public Void handleRequest(ScheduledEvent event, Context context) {
String queueUrl = instanceProperties.get(FIND_PARTITIONS_TO_SPLIT_QUEUE_URL);
TableIndex tableIndex = new DynamoDBTableIndex(instanceProperties, dynamoClient);
- SplitIntoBatches.reusingListOfSize(10,
- tableIndex.streamOnlineTables(),
- tables -> sendMessageBatch(tables, queueUrl));
+ InvokeForTables.sendOneMessagePerTable(sqsClient, queueUrl, tableIndex.streamOnlineTables());
Instant finishTime = Instant.now();
LOGGER.info("Lambda finished at {} (ran for {})", finishTime, LoggedDuration.withFullOutput(startTime, finishTime));
return null;
}
-
- private void sendMessageBatch(List tables, String queueUrl) {
- sqsClient.sendMessageBatch(new SendMessageBatchRequest()
- .withQueueUrl(queueUrl)
- .withEntries(tables.stream()
- .map(table -> new SendMessageBatchRequestEntry()
- .withMessageDeduplicationId(UUID.randomUUID().toString())
- .withId(table.getTableUniqueId())
- .withMessageGroupId(table.getTableUniqueId())
- .withMessageBody(table.getTableUniqueId()))
- .collect(toUnmodifiableList())));
- }
-
}