Skip to content

Commit

Permalink
Add test for creating multiple snapshots for the same table
Browse files Browse the repository at this point in the history
  • Loading branch information
kr565370 committed Apr 26, 2024
1 parent 7b92f9e commit c8d3337
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
import sleeper.configuration.properties.instance.InstanceProperties;
import sleeper.configuration.properties.table.TableProperties;
import sleeper.configuration.properties.table.TableProperty;
import sleeper.core.partition.PartitionTree;
import sleeper.core.partition.PartitionsBuilder;
import sleeper.core.schema.Schema;
import sleeper.core.schema.type.LongType;
import sleeper.core.statestore.FileReference;
import sleeper.core.statestore.FileReferenceFactory;
import sleeper.core.statestore.StateStore;
import sleeper.statestore.transactionlog.DynamoDBTransactionLogSnapshotStore.LatestSnapshots;
Expand Down Expand Up @@ -103,6 +106,43 @@ void shouldCreateSnapshotsForMultipleTables() throws Exception {
partitionsSnapshot(table2, 1)));
}

@Test
void shouldCreateMultipleSnapshotsForOneTable() throws Exception {
// Given
TableProperties table = createTable("test-table-id-1", "test-table-1");
PartitionTree tree = new PartitionsBuilder(schema)
.rootFirst("root")
.splitToNewChildren("root", "L", "R", 123L)
.buildTree();
FileReferenceFactory factory = FileReferenceFactory.from(tree);
StateStore stateStore = createStateStore(table);
stateStore.initialise();
FileReference file1 = factory.rootFile(123L);
stateStore.addFile(file1);
runSnapshotCreator(table);

// When
stateStore.atomicallyUpdatePartitionAndCreateNewOnes(
tree.getPartition("root"), tree.getPartition("L"), tree.getPartition("R"));
FileReference file2 = factory.partitionFile("L", 456L);
stateStore.addFile(file2);
runSnapshotCreator(table);

// Then
assertThat(snapshotStore(table).getLatestSnapshots())
.contains(new LatestSnapshots(
filesSnapshot(table, 2),
partitionsSnapshot(table, 2)));
assertThat(snapshotStore(table).getFilesSnapshots())
.containsExactly(
filesSnapshot(table, 1),
filesSnapshot(table, 2));
assertThat(snapshotStore(table).getPartitionsSnapshots())
.containsExactly(
partitionsSnapshot(table, 1),
partitionsSnapshot(table, 2));
}

@Test
void shouldSkipCreatingSnapshotsIfStateHasNotUpdatedSinceLastSnapshot() throws Exception {
// Given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import static sleeper.core.schema.SchemaTestHelper.schemaWithKey;

public class TransactionLogStateStoreDynamoDBSpecificIT extends TransactionLogStateStoreTestBase {
protected static final Instant DEFAULT_UPDATE_TIME = Instant.parse("2024-04-26T13:00:00Z");
@TempDir
private Path tempDir;
private final Schema schema = schemaWithKey("key", new LongType());
Expand Down Expand Up @@ -121,7 +122,7 @@ void shouldLoadLatestSnapshotsWhenCreatingStateStore() throws Exception {
.splitToNewChildren("root", "L", "R", 123L)
.buildTree();
initialisePartitionsForSnapshot(tree, 1);
FileReferenceFactory factory = FileReferenceFactory.from(tree);
FileReferenceFactory factory = factory(tree);
List<FileReference> files = List.of(
factory.rootFile("file1.parquet", 100L),
factory.partitionFile("L", "file2.parquet", 25L),
Expand All @@ -130,12 +131,12 @@ void shouldLoadLatestSnapshotsWhenCreatingStateStore() throws Exception {
createSnapshot();

// When
StateStore stateStore = createStateStore();
StateStore stateStore = stateStore();

// Then
assertThat(stateStore.getAllPartitions())
.containsExactlyElementsOf(tree.getAllPartitions());
assertThat(stateStore.getFileReferences())
.usingRecursiveFieldByFieldElementComparatorIgnoringFields("lastStateStoreUpdateTime")
.containsExactlyElementsOf(files);
}

Expand All @@ -148,7 +149,7 @@ void shouldNotLoadLatestSnapshotsIfPropertyIsFalse() throws Exception {
.splitToNewChildren("root", "L", "R", 123L)
.buildTree();
initialisePartitionsForSnapshot(tree, 1);
FileReferenceFactory factory = FileReferenceFactory.from(tree);
FileReferenceFactory factory = factory(tree);
List<FileReference> files = List.of(
factory.rootFile("file1.parquet", 100L),
factory.partitionFile("L", "file2.parquet", 25L),
Expand All @@ -157,7 +158,7 @@ void shouldNotLoadLatestSnapshotsIfPropertyIsFalse() throws Exception {
createSnapshot();

// When
StateStore stateStore = createStateStore();
StateStore stateStore = stateStore();

// Then
assertThat(stateStore.getAllPartitions()).isEmpty();
Expand All @@ -167,12 +168,12 @@ void shouldNotLoadLatestSnapshotsIfPropertyIsFalse() throws Exception {
@Test
void shouldExcludePreviousTransactionsWhenLoadingLatestSnapshots() throws Exception {
// Given
StateStore stateStore = createStateStore();
StateStore stateStore = stateStore();
PartitionTree tree1 = new PartitionsBuilder(schema)
.rootFirst("root")
.splitToNewChildren("root", "A", "B", 123L)
.buildTree();
FileReferenceFactory factory1 = FileReferenceFactory.from(tree1);
FileReferenceFactory factory1 = factory(tree1);
FileReference file1 = factory1.rootFile("file1.parquet", 123L);
stateStore.initialise(tree1.getAllPartitions());
stateStore.addFile(file1);
Expand All @@ -181,20 +182,19 @@ void shouldExcludePreviousTransactionsWhenLoadingLatestSnapshots() throws Except
.rootFirst("root")
.splitToNewChildren("root", "C", "D", 456L)
.buildTree();
FileReferenceFactory factory2 = FileReferenceFactory.from(tree2);
FileReferenceFactory factory2 = factory(tree2);
FileReference file2 = factory2.rootFile("file2.parquet", 456L);
initialisePartitionsForSnapshot(tree2, 1);
addFilesForSnapshot(List.of(file2), 1);
createSnapshot();

// When
stateStore = createStateStore();
stateStore = stateStore();

// Then
assertThat(stateStore.getAllPartitions())
.containsExactlyElementsOf(tree2.getAllPartitions());
assertThat(stateStore.getFileReferences())
.usingRecursiveFieldByFieldElementComparatorIgnoringFields("lastStateStoreUpdateTime")
.containsExactly(file2);
}

Expand All @@ -209,7 +209,7 @@ void shouldLoadLatestPartitionsSnapshotIfNoFilesSnapshotIsPresent() throws Excep
createSnapshot();

// When
StateStore stateStore = createStateStore();
StateStore stateStore = stateStore();

// Then
assertThat(stateStore.getAllPartitions())
Expand All @@ -224,7 +224,7 @@ void shouldLoadLatestFilesSnapshotIfNoPartitionsSnapshotIsPresent() throws Excep
.rootFirst("root")
.splitToNewChildren("root", "L", "R", 123L)
.buildTree();
FileReferenceFactory factory = FileReferenceFactory.from(tree);
FileReferenceFactory factory = factory(tree);
List<FileReference> files = List.of(
factory.rootFile("file1.parquet", 100L),
factory.partitionFile("L", "file2.parquet", 25L),
Expand All @@ -233,24 +233,23 @@ void shouldLoadLatestFilesSnapshotIfNoPartitionsSnapshotIsPresent() throws Excep
createSnapshot();

// When
StateStore stateStore = createStateStore();
StateStore stateStore = stateStore();

// Then
assertThat(stateStore.getAllPartitions()).isEmpty();
assertThat(stateStore.getFileReferences())
.usingRecursiveFieldByFieldElementComparatorIgnoringFields("lastStateStoreUpdateTime")
.containsExactlyElementsOf(files);
}

private void initialisePartitionsForSnapshot(PartitionTree tree, long transactionNumber) throws DuplicateTransactionNumberException {
partitionsStore.addTransaction(new TransactionLogEntry(transactionNumber, Instant.now(),
partitionsStore.addTransaction(new TransactionLogEntry(transactionNumber, DEFAULT_UPDATE_TIME,
new InitialisePartitionsTransaction(tree.getAllPartitions())));
}

private void addFilesForSnapshot(List<FileReference> files, long transactionNumber) throws DuplicateTransactionNumberException {
filesStore.addTransaction(new TransactionLogEntry(transactionNumber, Instant.now(),
filesStore.addTransaction(new TransactionLogEntry(transactionNumber, DEFAULT_UPDATE_TIME,
new AddFilesTransaction(files.stream()
.map(file -> AllReferencesToAFile.fileWithOneReference(file, Instant.now()))
.map(file -> AllReferencesToAFile.fileWithOneReference(file, DEFAULT_UPDATE_TIME))
.collect(Collectors.toList()))));
}

Expand All @@ -259,11 +258,25 @@ private void createSnapshot() {
instanceProperties, tableProperties, filesStore, partitionsStore, dynamoDBClient, configuration)
.createSnapshot();
}

private FileReferenceFactory factory(PartitionTree tree) {
return FileReferenceFactory.fromUpdatedAt(tree, DEFAULT_UPDATE_TIME);
}

private StateStore stateStore() {
StateStore stateStore = createStateStore();
stateStore.fixFileUpdateTime(DEFAULT_UPDATE_TIME);
stateStore.fixPartitionUpdateTime(DEFAULT_UPDATE_TIME);
return stateStore;
}
}

private StateStore createStateStore() {
return DynamoDBTransactionLogStateStore.builderFrom(instanceProperties, tableProperties, dynamoDBClient, s3Client, configuration)
StateStore stateStore = DynamoDBTransactionLogStateStore.builderFrom(instanceProperties, tableProperties, dynamoDBClient, s3Client, configuration)
.maxAddTransactionAttempts(1)
.build();
stateStore.fixFileUpdateTime(DEFAULT_UPDATE_TIME);
stateStore.fixPartitionUpdateTime(DEFAULT_UPDATE_TIME);
return stateStore;
}
}

0 comments on commit c8d3337

Please sign in to comment.