Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 2271 - Load latest snapshot when first using transaction log state store #2355

Merged
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
7285faa
Start adding tests
kr565370 Apr 22, 2024
475419b
Merge branch '2272-track-transactionlog-snapshots-in-dynamodb' into 2…
kr565370 Apr 22, 2024
54e69f1
Merge branch '2272-track-transactionlog-snapshots-in-dynamodb' into 2…
kr565370 Apr 22, 2024
e0ecc74
Merge branch '2272-track-transactionlog-snapshots-in-dynamodb' into 2…
kr565370 Apr 22, 2024
ed60b52
Create test for loading latest snapshot when creating state store object
kr565370 Apr 22, 2024
a296f6e
Fix failing test, use snapshot loading code in every test
kr565370 Apr 23, 2024
bc42cc0
Use Optional.ifPresent in DynamoDBTransactionLogStateStore.builderFrom
kr565370 Apr 23, 2024
25da78e
Move serde definitions into ifPresent block
kr565370 Apr 23, 2024
2f7a3d6
Add property for enabling loading of latest snapshots
kr565370 Apr 23, 2024
54006ba
Use delegating builder method
kr565370 Apr 23, 2024
a9f4c07
Create snapshot tables in CDK
kr565370 Apr 23, 2024
5d692e6
Merge branch '2272-track-transactionlog-snapshots-in-dynamodb' into 2…
kr565370 Apr 23, 2024
ed79a82
Merge branch 'develop' into 2271-load-latest-snapshot-when-first-usin…
patchwork01 Apr 24, 2024
f18460a
Rename tables created by CDK
kr565370 Apr 24, 2024
274b133
Move call to create snapshot store inside state store creator
kr565370 Apr 24, 2024
1dd1505
Remove old static builder method
kr565370 Apr 24, 2024
dd0332b
Add test for skipping previous transactions when loading latest snaps…
kr565370 Apr 24, 2024
df2105f
Stop wrapping IOException in StateStoreFileUtils
kr565370 Apr 24, 2024
9a2c2f7
Combine snapshot serde classes. Pass snapshot objects to store and se…
kr565370 Apr 24, 2024
d74ae40
Keep names of transaction log properties and tables consistent
kr565370 Apr 24, 2024
410e52c
Convert load latest snapshots property to table property
kr565370 Apr 24, 2024
1099ec5
Add tests for loading files snapshot when partitions snapshot is not …
kr565370 Apr 25, 2024
195d7e6
Merge branch 'develop' into 2271-load-latest-snapshot-when-first-usin…
gaffer01 Apr 26, 2024
d4d1b08
Hold optional snapshots inside LatestSnapshot
kr565370 Apr 26, 2024
2d6a866
Revert "Hold optional snapshots inside LatestSnapshot"
kr565370 Apr 26, 2024
d583ed5
Merge branch 'develop' into 2271-load-latest-snapshot-when-first-usin…
kr565370 Apr 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions example/full/instance.properties
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ sleeper.metadata.dynamo.pointintimerecovery=false
# revision DynamoDB table.
sleeper.metadata.s3.dynamo.pointintimerecovery=false

# If set, transaction log state stores will load the latest snapshot from the snapshot store when
# created.
sleeper.metadata.transactionlog.load.latest.snapshots=true

# This specifies whether point in time recovery is enabled for the Sleeper table index. This is set on
# the DynamoDB tables.
sleeper.tables.index.dynamo.pointintimerecovery=false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,34 @@
import software.constructs.Construct;

import sleeper.configuration.properties.instance.InstanceProperties;
import sleeper.statestore.transactionlog.DynamoDBTransactionLogSnapshotStore;
import sleeper.statestore.transactionlog.DynamoDBTransactionLogStateStore;

import static sleeper.cdk.Utils.removalPolicy;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.FILE_TRANSACTION_LOG_TABLENAME;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.PARTITION_TRANSACTION_LOG_TABLENAME;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.TRANSACTION_LOG_ALL_SNAPSHOTS_TABLENAME;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.TRANSACTION_LOG_LATEST_SNAPSHOTS_TABLENAME;
import static sleeper.configuration.properties.instance.CommonProperty.ID;

public class TransactionLogStateStoreStack extends NestedStack {
private final Table partitionsLogTable;
private final Table filesLogTable;
private final Table latestSnapshotTable;
private final Table allSnapshotsTable;

public TransactionLogStateStoreStack(
Construct scope, String id, InstanceProperties instanceProperties) {
super(scope, id);

partitionsLogTable = createTransactionLogTable(instanceProperties, "PartitionTransactionLogTable", "partition-transaction-log");
filesLogTable = createTransactionLogTable(instanceProperties, "FileTransactionLogTable", "file-transaction-log");
latestSnapshotTable = createLatestSnapshotTable(instanceProperties, "TransactionLogLatestSnapshotTable", "tl-latest-snapshot");
allSnapshotsTable = createAllSnapshotsTable(instanceProperties, "TransactionLogAllSnapshotsTable", "tl-all-snapshots-table");
patchwork01 marked this conversation as resolved.
Show resolved Hide resolved
instanceProperties.set(PARTITION_TRANSACTION_LOG_TABLENAME, partitionsLogTable.getTableName());
instanceProperties.set(FILE_TRANSACTION_LOG_TABLENAME, filesLogTable.getTableName());
instanceProperties.set(TRANSACTION_LOG_LATEST_SNAPSHOTS_TABLENAME, latestSnapshotTable.getTableName());
instanceProperties.set(TRANSACTION_LOG_ALL_SNAPSHOTS_TABLENAME, allSnapshotsTable.getTableName());
patchwork01 marked this conversation as resolved.
Show resolved Hide resolved
}

private Table createTransactionLogTable(InstanceProperties instanceProperties, String id, String name) {
Expand All @@ -62,6 +71,36 @@ private Table createTransactionLogTable(InstanceProperties instanceProperties, S
.build();
}

private Table createLatestSnapshotTable(InstanceProperties instanceProperties, String id, String name) {
return Table.Builder
.create(this, id)
.tableName(String.join("-", "sleeper", instanceProperties.get(ID), name))
.removalPolicy(removalPolicy(instanceProperties))
.billingMode(BillingMode.PAY_PER_REQUEST)
.partitionKey(Attribute.builder()
.name(DynamoDBTransactionLogSnapshotStore.TABLE_ID)
.type(AttributeType.STRING)
.build())
.build();
}

private Table createAllSnapshotsTable(InstanceProperties instanceProperties, String id, String name) {
return Table.Builder
.create(this, id)
.tableName(String.join("-", "sleeper", instanceProperties.get(ID), name))
.removalPolicy(removalPolicy(instanceProperties))
.billingMode(BillingMode.PAY_PER_REQUEST)
.partitionKey(Attribute.builder()
.name(DynamoDBTransactionLogSnapshotStore.TABLE_ID_AND_SNAPSHOT_TYPE)
.type(AttributeType.STRING)
.build())
.sortKey(Attribute.builder()
.name(DynamoDBTransactionLogSnapshotStore.TRANSACTION_NUMBER)
.type(AttributeType.NUMBER)
.build())
.build();
}

public void grantReadFiles(IGrantable grantee) {
filesLogTable.grantReadData(grantee);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,12 @@ public interface CommonProperty {
.validationPredicate(Utils::isTrueOrFalse)
.propertyGroup(InstancePropertyGroup.COMMON)
.runCdkDeployWhenChanged(true).build();
UserDefinedInstanceProperty TRANSACTION_LOG_STATE_STORE_LOAD_LATEST_SNAPSHOTS = Index.propertyBuilder("sleeper.metadata.transactionlog.load.latest.snapshots")
.description("If set, transaction log state stores will load the latest snapshot from the snapshot store when created.")
.defaultValue("true")
.validationPredicate(Utils::isTrueOrFalse)
.propertyGroup(InstancePropertyGroup.COMMON)
.build();
patchwork01 marked this conversation as resolved.
Show resolved Hide resolved
UserDefinedInstanceProperty TABLE_INDEX_DYNAMO_POINT_IN_TIME_RECOVERY = Index.propertyBuilder("sleeper.tables.index.dynamo.pointintimerecovery")
.description("This specifies whether point in time recovery is enabled for the Sleeper table index. " +
"This is set on the DynamoDB tables.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ public class DynamoDBTransactionLogSnapshotStore {
private final AmazonDynamoDB dynamo;
private final Supplier<Instant> timeSupplier;

public DynamoDBTransactionLogSnapshotStore(InstanceProperties instanceProperties, TableProperties tableProperties, AmazonDynamoDB dynamo) {
this(instanceProperties, tableProperties, dynamo, Instant::now);
}

public DynamoDBTransactionLogSnapshotStore(InstanceProperties instanceProperties, TableProperties tableProperties, AmazonDynamoDB dynamo, Supplier<Instant> timeSupplier) {
this.allSnapshotsTable = instanceProperties.get(TRANSACTION_LOG_ALL_SNAPSHOTS_TABLENAME);
this.latestSnapshotsTable = instanceProperties.get(TRANSACTION_LOG_LATEST_SNAPSHOTS_TABLENAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void create() {
createAllSnapshotsTable();
}

public void createLatestSnapshotTable() {
private void createLatestSnapshotTable() {
List<AttributeDefinition> attributeDefinitions = List.of(
new AttributeDefinition(DynamoDBTransactionLogSnapshotStore.TABLE_ID, ScalarAttributeType.S));
List<KeySchemaElement> keySchemaElements = List.of(
Expand All @@ -57,7 +57,7 @@ public void createLatestSnapshotTable() {
dynamoDB.createTable(request);
}

public void createAllSnapshotsTable() {
private void createAllSnapshotsTable() {
List<AttributeDefinition> attributeDefinitions = List.of(
new AttributeDefinition(DynamoDBTransactionLogSnapshotStore.TABLE_ID_AND_SNAPSHOT_TYPE, ScalarAttributeType.S),
new AttributeDefinition(DynamoDBTransactionLogSnapshotStore.TRANSACTION_NUMBER, ScalarAttributeType.N));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@

import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.s3.AmazonS3;
import org.apache.hadoop.conf.Configuration;

import sleeper.configuration.properties.instance.InstanceProperties;
import sleeper.configuration.properties.table.TableProperties;
import sleeper.core.statestore.StateStoreException;
import sleeper.core.statestore.transactionlog.TransactionLogStateStore;

import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.FILE_TRANSACTION_LOG_TABLENAME;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.PARTITION_TRANSACTION_LOG_TABLENAME;
import static sleeper.configuration.properties.instance.CommonProperty.TRANSACTION_LOG_STATE_STORE_LOAD_LATEST_SNAPSHOTS;

public class DynamoDBTransactionLogStateStore extends TransactionLogStateStore {
public static final String TABLE_ID = "TABLE_ID";
Expand All @@ -36,11 +39,35 @@ public DynamoDBTransactionLogStateStore(

public static TransactionLogStateStore.Builder builderFrom(
InstanceProperties instanceProperties, TableProperties tableProperties, AmazonDynamoDB dynamoDB, AmazonS3 s3) {
return builder()
return builderFrom(instanceProperties, tableProperties, dynamoDB, s3, new Configuration());
patchwork01 marked this conversation as resolved.
Show resolved Hide resolved
}

public static TransactionLogStateStore.Builder builderFrom(
InstanceProperties instanceProperties, TableProperties tableProperties, AmazonDynamoDB dynamoDB, AmazonS3 s3, Configuration configuration) {
Builder builder = builder()
.sleeperTable(tableProperties.getStatus())
.schema(tableProperties.getSchema())
.filesLogStore(new DynamoDBTransactionLogStore(instanceProperties.get(FILE_TRANSACTION_LOG_TABLENAME), instanceProperties, tableProperties, dynamoDB, s3))
.partitionsLogStore(new DynamoDBTransactionLogStore(instanceProperties.get(PARTITION_TRANSACTION_LOG_TABLENAME), instanceProperties, tableProperties, dynamoDB, s3));
if (instanceProperties.getBoolean(TRANSACTION_LOG_STATE_STORE_LOAD_LATEST_SNAPSHOTS)) {
loadLatestSnapshots(builder, instanceProperties, tableProperties, dynamoDB, configuration);
}
return builder;
}

private static void loadLatestSnapshots(
TransactionLogStateStore.Builder builder, InstanceProperties instanceProperties, TableProperties tableProperties,
AmazonDynamoDB dynamoDB, Configuration configuration) {
new DynamoDBTransactionLogSnapshotStore(instanceProperties, tableProperties, dynamoDB).getLatestSnapshots()
.ifPresent(latestSnapshots -> {
TransactionLogFilesSnapshotSerDe filesSnapshotSerDe = new TransactionLogFilesSnapshotSerDe(configuration);
TransactionLogPartitionsSnapshotSerDe partitionsSnapshotSerDe = new TransactionLogPartitionsSnapshotSerDe(tableProperties.getSchema(), configuration);
try {
builder.filesState(filesSnapshotSerDe.load(latestSnapshots.getFilesSnapshot()))
.partitionsState(partitionsSnapshotSerDe.load(latestSnapshots.getPartitionsSnapshot()));
patchwork01 marked this conversation as resolved.
Show resolved Hide resolved
} catch (StateStoreException e) {
patchwork01 marked this conversation as resolved.
Show resolved Hide resolved
throw new RuntimeException(e);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,19 @@ public class TransactionLogFilesSnapshotSerDe {
this.stateStoreFileUtils = StateStoreFileUtils.forFiles(configuration);
}

void save(String basePath, StateStoreFiles state, long lastTransactionNumber) throws StateStoreException {
stateStoreFileUtils.saveFiles(createFilesPath(basePath, lastTransactionNumber), state);
String save(String basePath, StateStoreFiles state, long lastTransactionNumber) throws StateStoreException {
String filePath = createFilesPath(basePath, lastTransactionNumber);
stateStoreFileUtils.saveFiles(filePath, state);
return filePath;
}

StateStoreFiles load(String basePath, long lastTransactionNumber) throws StateStoreException {
StateStoreFiles load(TransactionLogSnapshot snapshot) throws StateStoreException {
return load(snapshot.getPath());
}

StateStoreFiles load(String filePath) throws StateStoreException {
StateStoreFiles files = new StateStoreFiles();
stateStoreFileUtils.loadFiles(createFilesPath(basePath, lastTransactionNumber), files::add);
stateStoreFileUtils.loadFiles(filePath, files::add);
return files;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,19 @@ public class TransactionLogPartitionsSnapshotSerDe {
this.stateStoreFileUtils = StateStoreFileUtils.forPartitions(configuration);
}

void save(String basePath, StateStorePartitions state, long lastTransactionNumber) throws StateStoreException {
stateStoreFileUtils.savePartitions(createPartitionsPath(basePath, lastTransactionNumber), state, sleeperSchema);
String save(String basePath, StateStorePartitions state, long lastTransactionNumber) throws StateStoreException {
String filePath = createPartitionsPath(basePath, lastTransactionNumber);
stateStoreFileUtils.savePartitions(filePath, state, sleeperSchema);
return filePath;
}

StateStorePartitions load(String basePath, long lastTransactionNumber) throws StateStoreException {
StateStorePartitions load(TransactionLogSnapshot snapshot) throws StateStoreException {
return load(snapshot.getPath());
}

StateStorePartitions load(String filePath) throws StateStoreException {
StateStorePartitions partitions = new StateStorePartitions();
stateStoreFileUtils.loadPartitions(createPartitionsPath(basePath, lastTransactionNumber), sleeperSchema, partitions::put);
stateStoreFileUtils.loadPartitions(filePath, sleeperSchema, partitions::put);
return partitions;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ void shouldSaveAndLoadPartitionsState() throws StateStoreException {

// When
TransactionLogPartitionsSnapshotSerDe snapshot = new TransactionLogPartitionsSnapshotSerDe(schema, configuration);
snapshot.save(tempDir.toString(), state, 1);
String filePath = snapshot.save(tempDir.toString(), state, 1);

// Then
assertThat(snapshot.load(tempDir.toString(), 1)).isEqualTo(state);
assertThat(snapshot.load(filePath)).isEqualTo(state);
patchwork01 marked this conversation as resolved.
Show resolved Hide resolved
}

@Test
Expand All @@ -67,10 +67,10 @@ void shouldSaveAndLoadFilesState() throws StateStoreException {

// When
TransactionLogFilesSnapshotSerDe snapshot = new TransactionLogFilesSnapshotSerDe(configuration);
snapshot.save(tempDir.toString(), state, 1);
String filePath = snapshot.save(tempDir.toString(), state, 1);

// Then
assertThat(snapshot.load(tempDir.toString(), 1)).isEqualTo(state);
assertThat(snapshot.load(filePath)).isEqualTo(state);
}

private FileReferenceFactory fileFactory() {
Expand Down
Loading
Loading