Skip to content

Commit

Permalink
Merge branch '1352-s3-state-store-for-all-tables' into 1353-table-met…
Browse files Browse the repository at this point in the history
…rics-for-all-tables
  • Loading branch information
kr565370 committed Sep 28, 2023
2 parents d33b180 + 0547572 commit 03797b0
Show file tree
Hide file tree
Showing 23 changed files with 166 additions and 226 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import static sleeper.configuration.properties.instance.SystemDefinedInstanceProperty.CONFIG_BUCKET;
import static sleeper.configuration.properties.instance.SystemDefinedInstanceProperty.DATA_BUCKET;
import static sleeper.configuration.properties.table.TableProperty.GARBAGE_COLLECTOR_DELAY_BEFORE_DELETION;
import static sleeper.configuration.properties.table.TableProperty.REVISION_TABLENAME;
import static sleeper.configuration.properties.table.TableProperty.STATESTORE_CLASSNAME;
import static sleeper.configuration.properties.table.TableProperty.TABLE_NAME;
import static sleeper.configuration.testutils.LocalStackAwsV1ClientHelper.buildAwsV1Client;
Expand Down Expand Up @@ -187,8 +188,8 @@ public TableProperties createTableProperties(InstanceProperties instanceProperti
tableProperties.set(TABLE_NAME, tableName);
tableProperties.setSchema(schema);
tableProperties.set(GARBAGE_COLLECTOR_DELAY_BEFORE_DELETION, "10");
tableProperties.set(REVISION_TABLENAME, tableName + "-r");
tableProperties.saveToS3(s3Client);

return tableProperties;
}

Expand Down
38 changes: 21 additions & 17 deletions java/cdk/src/main/java/sleeper/cdk/SleeperCdkApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import sleeper.cdk.stack.PropertiesStack;
import sleeper.cdk.stack.QueryStack;
import sleeper.cdk.stack.S3StateStoreStack;
import sleeper.cdk.stack.StateStoreStacks;
import sleeper.cdk.stack.TableDataStack;
import sleeper.cdk.stack.TableStack;
import sleeper.cdk.stack.TopicStack;
Expand Down Expand Up @@ -78,8 +79,7 @@ public class SleeperCdkApp extends Stack {
private PersistentEmrBulkImportStack persistentEmrBulkImportStack;
private EksBulkImportStack eksBulkImportStack;
private IngestStatusStoreStack ingestStatusStoreStack;
private DynamoDBStateStoreStack dynamoDBStateStoreStack;
private S3StateStoreStack s3StateStoreStack;
private StateStoreStacks stateStoreStacks;

public SleeperCdkApp(App app, String id, StackProps props, InstanceProperties instanceProperties, BuiltJars jars) {
super(app, id, props);
Expand Down Expand Up @@ -125,14 +125,14 @@ public void create() {

// Stack for tables
dataStack = new TableDataStack(this, "TableData", instanceProperties);
dynamoDBStateStoreStack = new DynamoDBStateStoreStack(this, "DynamoDBStateStore", instanceProperties);
s3StateStoreStack = new S3StateStoreStack(this, "S3StateStore", instanceProperties, dataStack);
tableStack = new TableStack(this, "Table", instanceProperties, jars, dataStack,
dynamoDBStateStoreStack, s3StateStoreStack);
stateStoreStacks = new StateStoreStacks(
new DynamoDBStateStoreStack(this, "DynamoDBStateStore", instanceProperties),
new S3StateStoreStack(this, "S3StateStore", instanceProperties, dataStack));
tableStack = new TableStack(this, "Table", instanceProperties, jars, dataStack, stateStoreStacks);

// Stack for Athena analytics
if (optionalStacks.contains(AthenaStack.class.getSimpleName())) {
new AthenaStack(this, "Athena", instanceProperties, jars, tableStack, dataStack);
new AthenaStack(this, "Athena", instanceProperties, jars, stateStoreStacks, dataStack);
}

if (INGEST_STACK_NAMES.stream().anyMatch(optionalStacks::contains)) {
Expand All @@ -143,7 +143,7 @@ public void create() {
}
if (EMR_BULK_IMPORT_STACK_NAMES.stream().anyMatch(optionalStacks::contains)) {
emrBulkImportCommonStack = new CommonEmrBulkImportStack(this, "BulkImportEMRCommon",
instanceProperties, bulkImportBucketStack, tableStack, dataStack, ingestStatusStoreStack);
instanceProperties, bulkImportBucketStack, stateStoreStacks, dataStack, ingestStatusStoreStack);
}

// Stack to run bulk import jobs via EMR Serverless
Expand All @@ -152,7 +152,7 @@ public void create() {
instanceProperties, jars,
bulkImportBucketStack,
topicStack,
tableStack, dataStack,
stateStoreStacks, dataStack,
ingestStatusStoreStack.getResources()
);
}
Expand All @@ -164,7 +164,7 @@ public void create() {
bulkImportBucketStack,
emrBulkImportCommonStack,
topicStack,
tableStack.getStateStoreStacks(),
stateStoreStacks,
ingestStatusStoreStack.getResources()
);
}
Expand All @@ -177,7 +177,7 @@ public void create() {
bulkImportBucketStack,
emrBulkImportCommonStack,
topicStack,
tableStack.getStateStoreStacks(),
stateStoreStacks,
ingestStatusStoreStack.getResources()
);
}
Expand All @@ -188,7 +188,7 @@ public void create() {
instanceProperties,
jars,
bulkImportBucketStack,
tableStack, dataStack,
stateStoreStacks, dataStack,
topicStack,
ingestStatusStoreStack
);
Expand All @@ -199,7 +199,7 @@ public void create() {
new GarbageCollectorStack(this,
"GarbageCollector",
instanceProperties, jars,
tableStack, dataStack);
stateStoreStacks, dataStack);
}

// Stack for containers for compactions and splitting compactions
Expand All @@ -208,15 +208,15 @@ public void create() {
"Compaction",
instanceProperties, jars,
topicStack.getTopic(),
tableStack, dataStack);
stateStoreStacks, dataStack);
}

// Stack to split partitions
if (optionalStacks.contains(PartitionSplittingStack.class.getSimpleName())) {
partitionSplittingStack = new PartitionSplittingStack(this,
"PartitionSplitting",
instanceProperties, jars,
tableStack, dataStack,
stateStoreStacks, dataStack,
topicStack.getTopic());
}

Expand All @@ -225,15 +225,15 @@ public void create() {
new QueryStack(this,
"Query",
instanceProperties, jars,
tableStack, dataStack);
stateStoreStacks, dataStack);
}

// Stack for ingest jobs
if (optionalStacks.contains(IngestStack.class.getSimpleName())) {
ingestStack = new IngestStack(this,
"Ingest",
instanceProperties, jars,
tableStack, dataStack,
stateStoreStacks, dataStack,
topicStack.getTopic(),
ingestStatusStoreStack);
}
Expand Down Expand Up @@ -272,6 +272,10 @@ public TableStack getTableStack() {
return tableStack;
}

public StateStoreStacks getStateStoreStacks() {
return stateStoreStacks;
}

public TableDataStack getDataStack() {
return dataStack;
}
Expand Down
7 changes: 2 additions & 5 deletions java/cdk/src/main/java/sleeper/cdk/stack/AthenaStack.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
@SuppressFBWarnings("NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE")
public class AthenaStack extends NestedStack {
public AthenaStack(Construct scope, String id, InstanceProperties instanceProperties, BuiltJars jars,
TableStack tableStack, TableDataStack dataStack) {
StateStoreStacks stateStoreStacks, TableDataStack dataStack) {
super(scope, id);

String instanceId = instanceProperties.get(ID);
Expand Down Expand Up @@ -125,10 +125,7 @@ public AthenaStack(Construct scope, String id, InstanceProperties instanceProper

jarsBucket.grantRead(handler);

tableStack.getStateStoreStacks().forEach(sss -> {
sss.grantReadActiveFileMetadata(handler);
sss.grantReadPartitionMetadata(handler);
});
stateStoreStacks.grantReadActiveFilesAndPartitions(handler);
dataStack.getDataBucket().grantRead(handler);
configBucket.grantRead(handler);
spillBucket.grantReadWrite(handler);
Expand Down
27 changes: 10 additions & 17 deletions java/cdk/src/main/java/sleeper/cdk/stack/CompactionStack.java
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public CompactionStack(
InstanceProperties instanceProperties,
BuiltJars jars,
Topic topic,
TableStack tableStack, TableDataStack dataStack) {
StateStoreStacks stateStoreStacks, TableDataStack dataStack) {
super(scope, id);
this.instanceProperties = instanceProperties;
eventStore = CompactionStatusStoreStack.from(this, instanceProperties);
Expand Down Expand Up @@ -212,13 +212,13 @@ public CompactionStack(

// Lambda to periodically check for compaction jobs that should be created
lambdaToFindCompactionJobsThatShouldBeCreated(configBucket, jarsBucket, jobCreatorJar,
tableStack.getStateStoreStacks(), compactionJobsQueue, splittingCompactionJobsQueue);
stateStoreStacks, compactionJobsQueue, splittingCompactionJobsQueue);

// ECS cluster for compaction tasks
ecsClusterForCompactionTasks(configBucket, jarsBucket, taskCreatorJar, tableStack, dataStack, compactionJobsQueue);
ecsClusterForCompactionTasks(configBucket, jarsBucket, taskCreatorJar, stateStoreStacks, dataStack, compactionJobsQueue);

// ECS cluster for splitting compaction tasks
ecsClusterForSplittingCompactionTasks(configBucket, jarsBucket, taskCreatorJar, tableStack, dataStack,
ecsClusterForSplittingCompactionTasks(configBucket, jarsBucket, taskCreatorJar, stateStoreStacks, dataStack,
splittingCompactionJobsQueue);

// Lambda to create compaction tasks
Expand Down Expand Up @@ -344,7 +344,7 @@ private Queue sqsQueueForSplittingCompactionJobs(Topic topic) {
private void lambdaToFindCompactionJobsThatShouldBeCreated(IBucket configBucket,
IBucket jarsBucket,
LambdaCode jobCreatorJar,
List<StateStoreStack> stateStoreStacks,
StateStoreStacks stateStoreStacks,
Queue compactionMergeJobsQueue,
Queue compactionSplittingMergeJobsQueue) {

Expand All @@ -368,8 +368,7 @@ private void lambdaToFindCompactionJobsThatShouldBeCreated(IBucket configBucket,
// Grant this function permission to read from / write to the DynamoDB table
configBucket.grantRead(handler);
jarsBucket.grantRead(handler);
stateStoreStacks.forEach(stateStoreStack -> stateStoreStack.grantReadWriteActiveFileMetadata(handler));
stateStoreStacks.forEach(stateStoreStack -> stateStoreStack.grantReadPartitionMetadata(handler));
stateStoreStacks.grantReadPartitionsReadWriteActiveFiles(handler);
eventStore.grantWriteJobEvent(handler);

// Grant this function permission to put messages on the compaction
Expand All @@ -393,7 +392,7 @@ private void lambdaToFindCompactionJobsThatShouldBeCreated(IBucket configBucket,
private Cluster ecsClusterForCompactionTasks(IBucket configBucket,
IBucket jarsBucket,
LambdaCode taskCreatorJar,
TableStack tableStack, TableDataStack dataStack,
StateStoreStacks stateStoreStacks, TableDataStack dataStack,
Queue compactionMergeJobsQueue) {
VpcLookupOptions vpcLookupOptions = VpcLookupOptions.builder()
.vpcId(instanceProperties.get(VPC_ID))
Expand All @@ -420,10 +419,7 @@ private Cluster ecsClusterForCompactionTasks(IBucket configBucket,
configBucket.grantRead(taskDef.getTaskRole());
jarsBucket.grantRead(taskDef.getTaskRole());
dataStack.getDataBucket().grantReadWrite(taskDef.getTaskRole());
tableStack.getStateStoreStacks().forEach(
stateStoreStack -> stateStoreStack.grantReadWriteActiveFileMetadata(taskDef.getTaskRole()));
tableStack.getStateStoreStacks().forEach(
stateStoreStack -> stateStoreStack.grantReadWriteReadyForGCFileMetadata(taskDef.getTaskRole()));
stateStoreStacks.grantReadWriteActiveAndReadyForGCFiles(taskDef.getTaskRole());
eventStore.grantWriteJobEvent(taskDef.getTaskRole());
eventStore.grantWriteTaskEvent(taskDef.getTaskRole());

Expand Down Expand Up @@ -468,7 +464,7 @@ private Cluster ecsClusterForCompactionTasks(IBucket configBucket,
private Cluster ecsClusterForSplittingCompactionTasks(IBucket configBucket,
IBucket jarsBucket,
LambdaCode taskCreatorJar,
TableStack tableStack, TableDataStack dataStack,
StateStoreStacks stateStoreStacks, TableDataStack dataStack,
Queue compactionSplittingMergeJobsQueue) {
VpcLookupOptions vpcLookupOptions = VpcLookupOptions.builder()
.vpcId(instanceProperties.get(VPC_ID))
Expand All @@ -495,10 +491,7 @@ private Cluster ecsClusterForSplittingCompactionTasks(IBucket configBucket,
configBucket.grantRead(taskDef.getTaskRole());
jarsBucket.grantRead(taskDef.getTaskRole());
dataStack.getDataBucket().grantReadWrite(taskDef.getTaskRole());
tableStack.getStateStoreStacks().forEach(
stateStoreStack -> stateStoreStack.grantReadWriteActiveFileMetadata(taskDef.getTaskRole()));
tableStack.getStateStoreStacks().forEach(
stateStoreStack -> stateStoreStack.grantReadWriteReadyForGCFileMetadata(taskDef.getTaskRole()));
stateStoreStacks.grantReadWriteActiveAndReadyForGCFiles(taskDef.getTaskRole());
eventStore.grantWriteJobEvent(taskDef.getTaskRole());
eventStore.grantWriteTaskEvent(taskDef.getTaskRole());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import static sleeper.configuration.properties.instance.SystemDefinedInstanceProperty.PARTITION_TABLENAME;
import static sleeper.configuration.properties.instance.SystemDefinedInstanceProperty.READY_FOR_GC_FILEINFO_TABLENAME;

public class DynamoDBStateStoreStack extends NestedStack implements StateStoreStack {
public class DynamoDBStateStoreStack extends NestedStack {
private final Table activeFileInfoTable;
private final Table readyForGCFileInfoTable;
private final Table partitionTable;
Expand Down Expand Up @@ -104,32 +104,22 @@ public DynamoDBStateStoreStack(Construct scope, String id, InstanceProperties in
instanceProperties.set(PARTITION_TABLENAME, partitionTable.getTableName());
}

@Override
public void grantReadActiveFileMetadata(IGrantable grantee) {
activeFileInfoTable.grantReadData(grantee);
}

@Override
public void grantReadWriteActiveFileMetadata(IGrantable grantee) {
activeFileInfoTable.grantReadWriteData(grantee);
}

@Override
public void grantReadWriteReadyForGCFileMetadata(IGrantable grantee) {
readyForGCFileInfoTable.grantReadWriteData(grantee);
}

@Override
public void grantWriteReadyForGCFileMetadata(IGrantable grantee) {
readyForGCFileInfoTable.grantWriteData(grantee);
}

@Override
public void grantReadPartitionMetadata(IGrantable grantee) {
partitionTable.grantReadData(grantee);
}

@Override
public void grantReadWritePartitionMetadata(IGrantable grantee) {
partitionTable.grantReadWriteData(grantee);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public GarbageCollectorStack(
String id,
InstanceProperties instanceProperties,
BuiltJars jars,
TableStack tableStack, TableDataStack dataStack) {
StateStoreStacks stateStoreStacks, TableDataStack dataStack) {
super(scope, id);

// Config bucket
Expand Down Expand Up @@ -91,7 +91,7 @@ public GarbageCollectorStack(
configBucket.grantRead(handler);
dataStack.getDataBucket().grantRead(handler);
dataStack.getDataBucket().grantDelete(handler);
tableStack.getStateStoreStacks().forEach(stateStoreStack -> stateStoreStack.grantReadWriteReadyForGCFileMetadata(handler));
stateStoreStacks.grantReadWriteReadyForGCFiles(handler);

// Cloudwatch rule to trigger this lambda
Rule rule = Rule.Builder
Expand Down
9 changes: 4 additions & 5 deletions java/cdk/src/main/java/sleeper/cdk/stack/IngestStack.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public IngestStack(
String id,
InstanceProperties instanceProperties,
BuiltJars jars,
TableStack tableStack,
StateStoreStacks stateStoreStacks,
TableDataStack dataStack,
Topic topic,
IngestStatusStoreStack statusStoreStack) {
Expand All @@ -136,7 +136,7 @@ public IngestStack(
sqsQueueForIngestJobs(topic);

// ECS cluster for ingest tasks
ecsClusterForIngestTasks(configBucket, jarsBucket, tableStack, dataStack, ingestJobQueue);
ecsClusterForIngestTasks(configBucket, jarsBucket, stateStoreStacks, dataStack, ingestJobQueue);

// Lambda to create ingest tasks
lambdaToCreateIngestTasks(configBucket, ingestJobQueue, taskCreatorJar);
Expand Down Expand Up @@ -234,7 +234,7 @@ private Queue sqsQueueForIngestJobs(Topic topic) {
private Cluster ecsClusterForIngestTasks(
IBucket configBucket,
IBucket jarsBucket,
TableStack tableStack,
StateStoreStacks stateStoreStacks,
TableDataStack dataStack,
Queue ingestJobQueue) {
VpcLookupOptions vpcLookupOptions = VpcLookupOptions.builder()
Expand Down Expand Up @@ -274,8 +274,7 @@ private Cluster ecsClusterForIngestTasks(
configBucket.grantRead(taskDefinition.getTaskRole());
jarsBucket.grantRead(taskDefinition.getTaskRole());
dataStack.getDataBucket().grantReadWrite(taskDefinition.getTaskRole());
tableStack.getStateStoreStacks().forEach(stateStoreStack -> stateStoreStack.grantReadWriteActiveFileMetadata(taskDefinition.getTaskRole()));
tableStack.getStateStoreStacks().forEach(stateStoreStack -> stateStoreStack.grantReadPartitionMetadata(taskDefinition.getTaskRole()));
stateStoreStacks.grantReadPartitionsReadWriteActiveFiles(taskDefinition.getTaskRole());
statusStore.grantWriteJobEvent(taskDefinition.getTaskRole());
statusStore.grantWriteTaskEvent(taskDefinition.getTaskRole());
ingestJobQueue.grantConsumeMessages(taskDefinition.getTaskRole());
Expand Down
Loading

0 comments on commit 03797b0

Please sign in to comment.