Skip to content

Commit

Permalink
Use state store to setup snapshot state in state store tests
Browse files Browse the repository at this point in the history
  • Loading branch information
patchwork01 committed Apr 30, 2024
1 parent 243e59f commit 1204f3e
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,23 @@ void shouldNotCreateSnapshotForTableWithNoTransactions() throws Exception {
assertThat(snapshotStore(table).getPartitionsSnapshots()).isEmpty();
}

@Test
void shouldNotCreateFileSnapshotForTableWithOnlyPartitionTransactions() throws Exception {
// Given
TableProperties table = createTable("test-table-id-1", "test-table-1");
StateStore stateStore = createStateStoreWithInMemoryTransactionLog(table);
stateStore.initialise();

// When
runSnapshotCreator(table);

// Then
assertThat(snapshotStore(table).getLatestSnapshots())
.isEqualTo(new LatestSnapshots(null, partitionsSnapshot(table, 1)));
assertThat(snapshotStore(table).getFilesSnapshots()).isEmpty();
assertThat(snapshotStore(table).getPartitionsSnapshots()).containsExactly(partitionsSnapshot(table, 1));
}

@Test
void shouldRemoveSnapshotFilesIfDynamoTransactionFailed() throws Exception {
// Given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,18 @@
import sleeper.core.partition.PartitionsBuilder;
import sleeper.core.schema.Schema;
import sleeper.core.schema.type.LongType;
import sleeper.core.statestore.AllReferencesToAFile;
import sleeper.core.statestore.FileReference;
import sleeper.core.statestore.FileReferenceFactory;
import sleeper.core.statestore.StateStore;
import sleeper.core.statestore.transactionlog.DuplicateTransactionNumberException;
import sleeper.core.statestore.StateStoreException;
import sleeper.core.statestore.transactionlog.InMemoryTransactionLogStore;
import sleeper.core.statestore.transactionlog.TransactionLogEntry;
import sleeper.core.statestore.transactionlog.TransactionLogStateStore;
import sleeper.core.statestore.transactionlog.TransactionLogStore;
import sleeper.core.statestore.transactionlog.transactions.AddFilesTransaction;
import sleeper.core.statestore.transactionlog.transactions.InitialisePartitionsTransaction;
import sleeper.statestore.StateStoreFactory;

import java.nio.file.Path;
import java.time.Instant;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;

Expand Down Expand Up @@ -123,14 +119,15 @@ void shouldLoadLatestSnapshotsWhenCreatingStateStore() throws Exception {
.rootFirst("root")
.splitToNewChildren("root", "L", "R", 123L)
.buildTree();
initialisePartitionsForSnapshot(tree, 1);
FileReferenceFactory factory = factory(tree);
List<FileReference> files = List.of(
factory.rootFile("file1.parquet", 100L),
factory.partitionFile("L", "file2.parquet", 25L),
factory.partitionFile("R", "file3.parquet", 50L));
addFilesForSnapshot(files, 1);
createSnapshot();
createSnapshotWithFreshState(stateStore -> {
stateStore.initialise(tree.getAllPartitions());
stateStore.addFiles(files);
});

// When
StateStore stateStore = stateStoreFactory().getStateStore(tableProperties);
Expand All @@ -150,14 +147,15 @@ void shouldNotLoadLatestSnapshotsByClassname() throws Exception {
.rootFirst("root")
.splitToNewChildren("root", "L", "R", 123L)
.buildTree();
initialisePartitionsForSnapshot(tree, 1);
FileReferenceFactory factory = factory(tree);
List<FileReference> files = List.of(
factory.rootFile("file1.parquet", 100L),
factory.partitionFile("L", "file2.parquet", 25L),
factory.partitionFile("R", "file3.parquet", 50L));
addFilesForSnapshot(files, 1);
createSnapshot();
createSnapshotWithFreshState(stateStore -> {
stateStore.initialise(tree.getAllPartitions());
stateStore.addFiles(files);
});

// When
StateStore stateStore = stateStoreFactory().getStateStore(tableProperties);
Expand Down Expand Up @@ -186,9 +184,10 @@ void shouldExcludePreviousTransactionsWhenLoadingLatestSnapshots() throws Except
.buildTree();
FileReferenceFactory factory2 = factory(tree2);
FileReference file2 = factory2.rootFile("file2.parquet", 456L);
initialisePartitionsForSnapshot(tree2, 1);
addFilesForSnapshot(List.of(file2), 1);
createSnapshot();
createSnapshotWithFreshState(stateStore2 -> {
stateStore2.initialise(tree2.getAllPartitions());
stateStore2.addFile(file2);
});

// When
stateStore = createStateStore();
Expand All @@ -207,8 +206,9 @@ void shouldLoadLatestPartitionsSnapshotIfNoFilesSnapshotIsPresent() throws Excep
.rootFirst("root")
.splitToNewChildren("root", "L", "R", 123L)
.buildTree();
initialisePartitionsForSnapshot(tree, 1);
createSnapshot();
createSnapshotWithFreshState(stateStore -> {
stateStore.initialise(tree.getAllPartitions());
});

// When
StateStore stateStore = createStateStore();
Expand All @@ -231,8 +231,9 @@ void shouldLoadLatestFilesSnapshotIfNoPartitionsSnapshotIsPresent() throws Excep
factory.rootFile("file1.parquet", 100L),
factory.partitionFile("L", "file2.parquet", 25L),
factory.partitionFile("R", "file3.parquet", 50L));
addFilesForSnapshot(files, 1);
createSnapshot();
createSnapshotWithFreshState(stateStore -> {
stateStore.addFiles(files);
});

// When
StateStore stateStore = createStateStore();
Expand All @@ -243,24 +244,25 @@ void shouldLoadLatestFilesSnapshotIfNoPartitionsSnapshotIsPresent() throws Excep
.containsExactlyElementsOf(files);
}

private void initialisePartitionsForSnapshot(PartitionTree tree, long transactionNumber) throws DuplicateTransactionNumberException {
partitionsStore.addTransaction(new TransactionLogEntry(transactionNumber, DEFAULT_UPDATE_TIME,
new InitialisePartitionsTransaction(tree.getAllPartitions())));
}

private void addFilesForSnapshot(List<FileReference> files, long transactionNumber) throws DuplicateTransactionNumberException {
filesStore.addTransaction(new TransactionLogEntry(transactionNumber, DEFAULT_UPDATE_TIME,
new AddFilesTransaction(files.stream()
.map(file -> AllReferencesToAFile.fileWithOneReference(file, DEFAULT_UPDATE_TIME))
.collect(Collectors.toList()))));
}
private void createSnapshotWithFreshState(SetupStateStore setupState) throws Exception {
TransactionLogStore fileTransactions = new InMemoryTransactionLogStore();
TransactionLogStore partitionTransactions = new InMemoryTransactionLogStore();
StateStore stateStore = TransactionLogStateStore.builder()
.sleeperTable(tableProperties.getStatus())
.schema(schema)
.filesLogStore(fileTransactions)
.partitionsLogStore(partitionTransactions)
.build();
stateStore.fixFileUpdateTime(DEFAULT_UPDATE_TIME);
stateStore.fixPartitionUpdateTime(DEFAULT_UPDATE_TIME);
setupState.run(stateStore);

private void createSnapshot() {
DynamoDBTransactionLogSnapshotStore snapshotStore = new DynamoDBTransactionLogSnapshotStore(
instanceProperties, tableProperties, dynamoDBClient);
new TransactionLogSnapshotCreator(
instanceProperties, tableProperties, filesStore, partitionsStore, configuration,
snapshotStore::getLatestSnapshots, snapshotStore::saveSnapshot)
instanceProperties, tableProperties,
fileTransactions, partitionTransactions,
configuration, snapshotStore::getLatestSnapshots, snapshotStore::saveSnapshot)
.createSnapshot();
}

Expand All @@ -281,4 +283,8 @@ private StateStore createStateStore() {
private StateStoreFactory stateStoreFactory() {
return new StateStoreFactory(instanceProperties, s3Client, dynamoDBClient, configuration);
}

public interface SetupStateStore {
void run(StateStore stateStore) throws StateStoreException;
}
}

0 comments on commit 1204f3e

Please sign in to comment.