From 1204f3e7951611d147c494e4e43ccf17f44df654 Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Tue, 30 Apr 2024 14:36:47 +0100 Subject: [PATCH] Use state store to setup snapshot state in state store tests --- .../TransactionLogSnapshotCreatorIT.java | 17 +++++ ...actionLogStateStoreDynamoDBSpecificIT.java | 72 ++++++++++--------- 2 files changed, 56 insertions(+), 33 deletions(-) diff --git a/java/statestore/src/test/java/sleeper/statestore/transactionlog/TransactionLogSnapshotCreatorIT.java b/java/statestore/src/test/java/sleeper/statestore/transactionlog/TransactionLogSnapshotCreatorIT.java index 21f26bbba0..7a8fdfaf96 100644 --- a/java/statestore/src/test/java/sleeper/statestore/transactionlog/TransactionLogSnapshotCreatorIT.java +++ b/java/statestore/src/test/java/sleeper/statestore/transactionlog/TransactionLogSnapshotCreatorIT.java @@ -222,6 +222,23 @@ void shouldNotCreateSnapshotForTableWithNoTransactions() throws Exception { assertThat(snapshotStore(table).getPartitionsSnapshots()).isEmpty(); } + @Test + void shouldNotCreateFileSnapshotForTableWithOnlyPartitionTransactions() throws Exception { + // Given + TableProperties table = createTable("test-table-id-1", "test-table-1"); + StateStore stateStore = createStateStoreWithInMemoryTransactionLog(table); + stateStore.initialise(); + + // When + runSnapshotCreator(table); + + // Then + assertThat(snapshotStore(table).getLatestSnapshots()) + .isEqualTo(new LatestSnapshots(null, partitionsSnapshot(table, 1))); + assertThat(snapshotStore(table).getFilesSnapshots()).isEmpty(); + assertThat(snapshotStore(table).getPartitionsSnapshots()).containsExactly(partitionsSnapshot(table, 1)); + } + @Test void shouldRemoveSnapshotFilesIfDynamoTransactionFailed() throws Exception { // Given diff --git a/java/statestore/src/test/java/sleeper/statestore/transactionlog/TransactionLogStateStoreDynamoDBSpecificIT.java b/java/statestore/src/test/java/sleeper/statestore/transactionlog/TransactionLogStateStoreDynamoDBSpecificIT.java index 1e4ede418e..9779884f5d 100644 --- a/java/statestore/src/test/java/sleeper/statestore/transactionlog/TransactionLogStateStoreDynamoDBSpecificIT.java +++ b/java/statestore/src/test/java/sleeper/statestore/transactionlog/TransactionLogStateStoreDynamoDBSpecificIT.java @@ -26,22 +26,18 @@ import sleeper.core.partition.PartitionsBuilder; import sleeper.core.schema.Schema; import sleeper.core.schema.type.LongType; -import sleeper.core.statestore.AllReferencesToAFile; import sleeper.core.statestore.FileReference; import sleeper.core.statestore.FileReferenceFactory; import sleeper.core.statestore.StateStore; -import sleeper.core.statestore.transactionlog.DuplicateTransactionNumberException; +import sleeper.core.statestore.StateStoreException; import sleeper.core.statestore.transactionlog.InMemoryTransactionLogStore; -import sleeper.core.statestore.transactionlog.TransactionLogEntry; +import sleeper.core.statestore.transactionlog.TransactionLogStateStore; import sleeper.core.statestore.transactionlog.TransactionLogStore; -import sleeper.core.statestore.transactionlog.transactions.AddFilesTransaction; -import sleeper.core.statestore.transactionlog.transactions.InitialisePartitionsTransaction; import sleeper.statestore.StateStoreFactory; import java.nio.file.Path; import java.time.Instant; import java.util.List; -import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.LongStream; @@ -123,14 +119,15 @@ void shouldLoadLatestSnapshotsWhenCreatingStateStore() throws Exception { .rootFirst("root") .splitToNewChildren("root", "L", "R", 123L) .buildTree(); - initialisePartitionsForSnapshot(tree, 1); FileReferenceFactory factory = factory(tree); List files = List.of( factory.rootFile("file1.parquet", 100L), factory.partitionFile("L", "file2.parquet", 25L), factory.partitionFile("R", "file3.parquet", 50L)); - addFilesForSnapshot(files, 1); - createSnapshot(); + createSnapshotWithFreshState(stateStore -> { + stateStore.initialise(tree.getAllPartitions()); + stateStore.addFiles(files); + }); // When StateStore stateStore = stateStoreFactory().getStateStore(tableProperties); @@ -150,14 +147,15 @@ void shouldNotLoadLatestSnapshotsByClassname() throws Exception { .rootFirst("root") .splitToNewChildren("root", "L", "R", 123L) .buildTree(); - initialisePartitionsForSnapshot(tree, 1); FileReferenceFactory factory = factory(tree); List files = List.of( factory.rootFile("file1.parquet", 100L), factory.partitionFile("L", "file2.parquet", 25L), factory.partitionFile("R", "file3.parquet", 50L)); - addFilesForSnapshot(files, 1); - createSnapshot(); + createSnapshotWithFreshState(stateStore -> { + stateStore.initialise(tree.getAllPartitions()); + stateStore.addFiles(files); + }); // When StateStore stateStore = stateStoreFactory().getStateStore(tableProperties); @@ -186,9 +184,10 @@ void shouldExcludePreviousTransactionsWhenLoadingLatestSnapshots() throws Except .buildTree(); FileReferenceFactory factory2 = factory(tree2); FileReference file2 = factory2.rootFile("file2.parquet", 456L); - initialisePartitionsForSnapshot(tree2, 1); - addFilesForSnapshot(List.of(file2), 1); - createSnapshot(); + createSnapshotWithFreshState(stateStore2 -> { + stateStore2.initialise(tree2.getAllPartitions()); + stateStore2.addFile(file2); + }); // When stateStore = createStateStore(); @@ -207,8 +206,9 @@ void shouldLoadLatestPartitionsSnapshotIfNoFilesSnapshotIsPresent() throws Excep .rootFirst("root") .splitToNewChildren("root", "L", "R", 123L) .buildTree(); - initialisePartitionsForSnapshot(tree, 1); - createSnapshot(); + createSnapshotWithFreshState(stateStore -> { + stateStore.initialise(tree.getAllPartitions()); + }); // When StateStore stateStore = createStateStore(); @@ -231,8 +231,9 @@ void shouldLoadLatestFilesSnapshotIfNoPartitionsSnapshotIsPresent() throws Excep factory.rootFile("file1.parquet", 100L), factory.partitionFile("L", "file2.parquet", 25L), factory.partitionFile("R", "file3.parquet", 50L)); - addFilesForSnapshot(files, 1); - createSnapshot(); + createSnapshotWithFreshState(stateStore -> { + stateStore.addFiles(files); + }); // When StateStore stateStore = createStateStore(); @@ -243,24 +244,25 @@ void shouldLoadLatestFilesSnapshotIfNoPartitionsSnapshotIsPresent() throws Excep .containsExactlyElementsOf(files); } - private void initialisePartitionsForSnapshot(PartitionTree tree, long transactionNumber) throws DuplicateTransactionNumberException { - partitionsStore.addTransaction(new TransactionLogEntry(transactionNumber, DEFAULT_UPDATE_TIME, - new InitialisePartitionsTransaction(tree.getAllPartitions()))); - } - - private void addFilesForSnapshot(List files, long transactionNumber) throws DuplicateTransactionNumberException { - filesStore.addTransaction(new TransactionLogEntry(transactionNumber, DEFAULT_UPDATE_TIME, - new AddFilesTransaction(files.stream() - .map(file -> AllReferencesToAFile.fileWithOneReference(file, DEFAULT_UPDATE_TIME)) - .collect(Collectors.toList())))); - } + private void createSnapshotWithFreshState(SetupStateStore setupState) throws Exception { + TransactionLogStore fileTransactions = new InMemoryTransactionLogStore(); + TransactionLogStore partitionTransactions = new InMemoryTransactionLogStore(); + StateStore stateStore = TransactionLogStateStore.builder() + .sleeperTable(tableProperties.getStatus()) + .schema(schema) + .filesLogStore(fileTransactions) + .partitionsLogStore(partitionTransactions) + .build(); + stateStore.fixFileUpdateTime(DEFAULT_UPDATE_TIME); + stateStore.fixPartitionUpdateTime(DEFAULT_UPDATE_TIME); + setupState.run(stateStore); - private void createSnapshot() { DynamoDBTransactionLogSnapshotStore snapshotStore = new DynamoDBTransactionLogSnapshotStore( instanceProperties, tableProperties, dynamoDBClient); new TransactionLogSnapshotCreator( - instanceProperties, tableProperties, filesStore, partitionsStore, configuration, - snapshotStore::getLatestSnapshots, snapshotStore::saveSnapshot) + instanceProperties, tableProperties, + fileTransactions, partitionTransactions, + configuration, snapshotStore::getLatestSnapshots, snapshotStore::saveSnapshot) .createSnapshot(); } @@ -281,4 +283,8 @@ private StateStore createStateStore() { private StateStoreFactory stateStoreFactory() { return new StateStoreFactory(instanceProperties, s3Client, dynamoDBClient, configuration); } + + public interface SetupStateStore { + void run(StateStore stateStore) throws StateStoreException; + } }