Skip to content

Commit

Permalink
Invoke table operations with InvokeForTables
Browse files Browse the repository at this point in the history
  • Loading branch information
patchwork01 committed Apr 25, 2024
1 parent f84c17e commit 358d0f7
Show file tree
Hide file tree
Showing 12 changed files with 107 additions and 122 deletions.
5 changes: 5 additions & 0 deletions java/clients/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@
<artifactId>ingest-batcher-store</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>sleeper</groupId>
<artifactId>common-invoke-tables</artifactId>
<version>${project.parent.version}</version>
</dependency>
<!-- Test dependencies -->
<dependency>
<groupId>sleeper</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,13 @@
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;

import sleeper.configuration.properties.instance.InstanceProperties;
import sleeper.configuration.table.index.DynamoDBTableIndex;
import sleeper.core.table.TableIndex;
import sleeper.core.table.TableNotFoundException;
import sleeper.core.table.TableStatus;
import sleeper.core.util.SplitIntoBatches;
import sleeper.invoke.tables.InvokeForTables;

import java.util.List;
import java.util.UUID;
import java.util.stream.Stream;

import static java.util.stream.Collectors.toUnmodifiableList;
Expand Down Expand Up @@ -60,30 +55,12 @@ public static void main(String[] args) {
InstanceProperties instanceProperties = new InstanceProperties();
instanceProperties.loadFromS3GivenInstanceId(s3Client, instanceId);
TableIndex tableIndex = new DynamoDBTableIndex(instanceProperties, dynamoClient);
List<TableStatus> tables = tableNames.stream()
.map(name -> tableIndex.getTableByName(name)
.orElseThrow(() -> TableNotFoundException.withTableName(name)))
.collect(toUnmodifiableList());
String queueUrl = instanceProperties.get(GARBAGE_COLLECTOR_QUEUE_URL);
SplitIntoBatches.reusingListOfSize(10,
tables.stream(),
batch -> sendMessageBatch(sqsClient, batch, queueUrl));
InvokeForTables.sendOneMessagePerTableByName(sqsClient, queueUrl, tableIndex, tableNames);
} finally {
s3Client.shutdown();
dynamoClient.shutdown();
sqsClient.shutdown();
}
}

public static void sendMessageBatch(AmazonSQS sqsClient, List<TableStatus> tables, String queueUrl) {
sqsClient.sendMessageBatch(new SendMessageBatchRequest()
.withQueueUrl(queueUrl)
.withEntries(tables.stream()
.map(table -> new SendMessageBatchRequestEntry()
.withMessageDeduplicationId(UUID.randomUUID().toString())
.withId(table.getTableUniqueId())
.withMessageGroupId(table.getTableUniqueId())
.withMessageBody(table.getTableUniqueId()))
.collect(toUnmodifiableList())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;

import sleeper.core.table.TableIndex;
import sleeper.core.table.TableNotFoundException;
import sleeper.core.table.TableStatus;
import sleeper.core.util.SplitIntoBatches;

import java.util.List;
import java.util.UUID;
import java.util.stream.Stream;

import static java.util.stream.Collectors.toUnmodifiableList;

Expand All @@ -32,11 +35,19 @@ public class InvokeForTables {
private InvokeForTables() {
}

public static void sendOneMessagePerTable(AmazonSQS sqsClient, String queueUrl, List<TableStatus> tables) {
SplitIntoBatches.reusingListOfSize(10, tables.stream(),
public static void sendOneMessagePerTable(AmazonSQS sqsClient, String queueUrl, Stream<TableStatus> tables) {
SplitIntoBatches.reusingListOfSize(10, tables,
batch -> sendMessageBatch(sqsClient, queueUrl, batch));
}

public static void sendOneMessagePerTableByName(
AmazonSQS sqsClient, String queueUrl, TableIndex tableIndex, List<String> tableNames) {
List<TableStatus> tables = tableNames.stream().map(name -> tableIndex.getTableByName(name)
.orElseThrow(() -> TableNotFoundException.withTableName(name)))
.collect(toUnmodifiableList());
sendOneMessagePerTable(sqsClient, queueUrl, tables.stream());
}

private static void sendMessageBatch(AmazonSQS sqsClient, String queueUrl, List<TableStatus> tablesBatch) {
sqsClient.sendMessageBatch(new SendMessageBatchRequest()
.withQueueUrl(queueUrl)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,19 @@
import org.testcontainers.utility.DockerImageName;

import sleeper.core.CommonTestConstants;
import sleeper.core.table.InMemoryTableIndex;
import sleeper.core.table.TableIndex;
import sleeper.core.table.TableNotFoundException;

import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static java.util.stream.Collectors.toUnmodifiableList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static sleeper.core.table.TableStatusTestHelper.uniqueIdAndName;
import static sleeper.invoke.tables.LocalStackAwsV1ClientHelper.buildAwsV1Client;

Expand All @@ -55,7 +60,7 @@ void shouldSendOneMessage() {
String queueUrl = createFifoQueueGetUrl();

// When
InvokeForTables.sendOneMessagePerTable(sqsClient, queueUrl, List.of(
InvokeForTables.sendOneMessagePerTable(sqsClient, queueUrl, Stream.of(
uniqueIdAndName("table-id", "table-name")));

// Then
Expand All @@ -71,8 +76,7 @@ void shouldSendMoreMessagesThanFitInABatch() {
// When
InvokeForTables.sendOneMessagePerTable(sqsClient, queueUrl,
IntStream.rangeClosed(1, 11)
.mapToObj(i -> uniqueIdAndName("table-id-" + i, "table-name-" + i))
.collect(toUnmodifiableList()));
.mapToObj(i -> uniqueIdAndName("table-id-" + i, "table-name-" + i)));

// Then
assertThat(receiveTableIdMessages(queueUrl, 10)).containsExactly(
Expand All @@ -82,6 +86,54 @@ void shouldSendMoreMessagesThanFitInABatch() {
"table-id-11");
}

@Test
void shouldLookUpTableByName() {
// Given
String queueUrl = createFifoQueueGetUrl();
TableIndex tableIndex = new InMemoryTableIndex();
tableIndex.create(uniqueIdAndName("table-id", "table-name"));

// When
InvokeForTables.sendOneMessagePerTableByName(sqsClient, queueUrl, tableIndex, List.of("table-name"));

// Then
assertThat(receiveTableIdMessages(queueUrl, 2))
.containsExactly("table-id");
}

@Test
void shouldFailLookUpTableByName() {
// Given
String queueUrl = createFifoQueueGetUrl();
TableIndex tableIndex = new InMemoryTableIndex();

// When / Then
assertThatThrownBy(() -> InvokeForTables.sendOneMessagePerTableByName(
sqsClient, queueUrl, tableIndex, List.of("missing-table")))
.isInstanceOf(TableNotFoundException.class);
assertThat(receiveTableIdMessages(queueUrl, 1))
.isEmpty();
}

@Test
void shouldFailLookUpTableByNameOnSecondPage() {
// Given
String queueUrl = createFifoQueueGetUrl();
TableIndex tableIndex = new InMemoryTableIndex();
IntStream.rangeClosed(1, 11)
.mapToObj(i -> uniqueIdAndName("table-id-" + i, "table-name-" + i))
.forEach(tableIndex::create);

// When / Then
assertThatThrownBy(() -> InvokeForTables.sendOneMessagePerTableByName(sqsClient, queueUrl, tableIndex,
IntStream.rangeClosed(1, 12)
.mapToObj(i -> "table-name-" + i)
.collect(toUnmodifiableList())))
.isInstanceOf(TableNotFoundException.class);
assertThat(receiveTableIdMessages(queueUrl, 10))
.isEmpty();
}

private String createFifoQueueGetUrl() {
CreateQueueResult result = sqsClient.createQueue(new CreateQueueRequest()
.withQueueName(UUID.randomUUID().toString() + ".fifo")
Expand All @@ -92,7 +144,8 @@ private String createFifoQueueGetUrl() {
private List<String> receiveTableIdMessages(String queueUrl, int maxMessages) {
ReceiveMessageResult result = sqsClient.receiveMessage(
new ReceiveMessageRequest(queueUrl)
.withMaxNumberOfMessages(maxMessages));
.withMaxNumberOfMessages(maxMessages)
.withWaitTimeSeconds(0));
return result.getMessages().stream()
.map(Message::getBody)
.collect(toUnmodifiableList());
Expand Down
5 changes: 5 additions & 0 deletions java/compaction/compaction-job-creation-lambda/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@
<artifactId>configuration</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>sleeper</groupId>
<artifactId>common-invoke-tables</artifactId>
<version>${project.parent.version}</version>
</dependency>
<!-- Test dependencies -->
<dependency>
<groupId>com.amazonaws</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,18 @@
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import sleeper.configuration.properties.PropertiesReloader;
import sleeper.configuration.properties.instance.InstanceProperties;
import sleeper.configuration.table.index.DynamoDBTableIndex;
import sleeper.core.table.TableIndex;
import sleeper.core.table.TableStatus;
import sleeper.core.util.LoggedDuration;
import sleeper.core.util.SplitIntoBatches;
import sleeper.invoke.tables.InvokeForTables;

import java.time.Instant;
import java.util.List;
import java.util.UUID;

import static java.util.stream.Collectors.toUnmodifiableList;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.COMPACTION_JOB_CREATION_QUEUE_URL;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.CONFIG_BUCKET;

Expand Down Expand Up @@ -72,25 +66,10 @@ public Void handleRequest(ScheduledEvent event, Context context) {

String queueUrl = instanceProperties.get(COMPACTION_JOB_CREATION_QUEUE_URL);
TableIndex tableIndex = new DynamoDBTableIndex(instanceProperties, dynamoClient);
SplitIntoBatches.reusingListOfSize(10,
tableIndex.streamOnlineTables(),
tables -> sendMessageBatch(tables, queueUrl));
InvokeForTables.sendOneMessagePerTable(sqsClient, queueUrl, tableIndex.streamOnlineTables());

Instant finishTime = Instant.now();
LOGGER.info("Lambda finished at {} (ran for {})", finishTime, LoggedDuration.withFullOutput(startTime, finishTime));
return null;
}

private void sendMessageBatch(List<TableStatus> tables, String queueUrl) {
sqsClient.sendMessageBatch(new SendMessageBatchRequest()
.withQueueUrl(queueUrl)
.withEntries(tables.stream()
.map(table -> new SendMessageBatchRequestEntry()
.withMessageDeduplicationId(UUID.randomUUID().toString())
.withId(table.getTableUniqueId())
.withMessageGroupId(table.getTableUniqueId())
.withMessageBody(table.getTableUniqueId()))
.collect(toUnmodifiableList())));
}

}
5 changes: 5 additions & 0 deletions java/garbage-collector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@
<artifactId>statestore</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>sleeper</groupId>
<artifactId>common-invoke-tables</artifactId>
<version>${project.parent.version}</version>
</dependency>
<!-- Test dependencies -->
<dependency>
<groupId>sleeper</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,18 @@
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import sleeper.configuration.properties.PropertiesReloader;
import sleeper.configuration.properties.instance.InstanceProperties;
import sleeper.configuration.table.index.DynamoDBTableIndex;
import sleeper.core.table.TableIndex;
import sleeper.core.table.TableStatus;
import sleeper.core.util.LoggedDuration;
import sleeper.core.util.SplitIntoBatches;
import sleeper.invoke.tables.InvokeForTables;

import java.time.Instant;
import java.util.List;
import java.util.UUID;

import static java.util.stream.Collectors.toUnmodifiableList;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.CONFIG_BUCKET;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.GARBAGE_COLLECTOR_QUEUE_URL;
import static sleeper.configuration.properties.instance.GarbageCollectionProperty.GARBAGE_COLLECT_OFFLINE_TABLES;
Expand Down Expand Up @@ -73,25 +67,11 @@ public Void handleRequest(ScheduledEvent event, Context context) {
String queueUrl = instanceProperties.get(GARBAGE_COLLECTOR_QUEUE_URL);
boolean offlineEnabled = instanceProperties.getBoolean(GARBAGE_COLLECT_OFFLINE_TABLES);
TableIndex tableIndex = new DynamoDBTableIndex(instanceProperties, dynamoClient);
SplitIntoBatches.reusingListOfSize(10,
offlineEnabled ? tableIndex.streamAllTables() : tableIndex.streamOnlineTables(),
tables -> sendMessageBatch(tables, queueUrl));
InvokeForTables.sendOneMessagePerTable(sqsClient, queueUrl,
offlineEnabled ? tableIndex.streamAllTables() : tableIndex.streamOnlineTables());

Instant finishTime = Instant.now();
LOGGER.info("Lambda finished at {} (ran for {})", finishTime, LoggedDuration.withFullOutput(startTime, finishTime));
return null;
}

private void sendMessageBatch(List<TableStatus> tables, String queueUrl) {
sqsClient.sendMessageBatch(new SendMessageBatchRequest()
.withQueueUrl(queueUrl)
.withEntries(tables.stream()
.map(table -> new SendMessageBatchRequestEntry()
.withMessageDeduplicationId(UUID.randomUUID().toString())
.withId(table.getTableUniqueId())
.withMessageGroupId(table.getTableUniqueId())
.withMessageBody(table.getTableUniqueId()))
.collect(toUnmodifiableList())));
}

}
5 changes: 5 additions & 0 deletions java/metrics/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@
<artifactId>statestore</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>sleeper</groupId>
<artifactId>common-invoke-tables</artifactId>
<version>${project.parent.version}</version>
</dependency>
<!-- Test dependencies -->
<dependency>
<groupId>sleeper</groupId>
Expand Down
Loading

0 comments on commit 358d0f7

Please sign in to comment.