From b4bf165cf43f6b035043ce329d084c35bb13c9e1 Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Wed, 23 Oct 2024 08:15:36 +0000 Subject: [PATCH] Move reading snapshots to driver --- .../statestore/AwsSnapshotsDriver.java | 27 ++++++++++++++--- .../dsl/snapshot/SnapshotsDriver.java | 7 +++-- .../dsl/snapshot/WaitForSnapshot.java | 29 +++++-------------- 3 files changed, 35 insertions(+), 28 deletions(-) diff --git a/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/statestore/AwsSnapshotsDriver.java b/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/statestore/AwsSnapshotsDriver.java index 8e6ac2704d..2141c5ca74 100644 --- a/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/statestore/AwsSnapshotsDriver.java +++ b/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/statestore/AwsSnapshotsDriver.java @@ -18,8 +18,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import sleeper.core.partition.PartitionTree; import sleeper.core.properties.instance.InstanceProperties; import sleeper.core.properties.table.TableProperties; +import sleeper.core.statestore.AllReferencesToAllFiles; +import sleeper.core.statestore.transactionlog.StateStoreFiles; +import sleeper.core.statestore.transactionlog.StateStorePartitions; import sleeper.core.statestore.transactionlog.TransactionLogSnapshot; import sleeper.statestore.transactionlog.DynamoDBTransactionLogSnapshotStore; import sleeper.systemtest.drivers.util.SystemTestClients; @@ -27,6 +31,7 @@ import java.util.Optional; +import static java.util.stream.Collectors.toUnmodifiableList; import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.TRANSACTION_LOG_SNAPSHOT_CREATION_RULE; public class AwsSnapshotsDriver implements SnapshotsDriver { @@ -52,17 +57,31 @@ public void disableCreation(InstanceProperties instanceProperties) { } @Override - public Optional loadLatestFilesSnapshot(InstanceProperties instanceProperties, TableProperties tableProperties) { - return snapshotStore(instanceProperties, tableProperties).loadLatestFilesSnapshotIfAtMinimumTransaction(0); + public Optional loadLatestFilesSnapshot(InstanceProperties instanceProperties, TableProperties tableProperties) { + return snapshotStore(instanceProperties, tableProperties) + .loadLatestFilesSnapshotIfAtMinimumTransaction(0) + .map(AwsSnapshotsDriver::readFiles); } @Override - public Optional loadLatestPartitionsSnapshot(InstanceProperties instanceProperties, TableProperties tableProperties) { - return snapshotStore(instanceProperties, tableProperties).loadLatestPartitionsSnapshotIfAtMinimumTransaction(0); + public Optional loadLatestPartitionsSnapshot(InstanceProperties instanceProperties, TableProperties tableProperties) { + return snapshotStore(instanceProperties, tableProperties) + .loadLatestPartitionsSnapshotIfAtMinimumTransaction(0) + .map(AwsSnapshotsDriver::readPartitions); } private DynamoDBTransactionLogSnapshotStore snapshotStore(InstanceProperties instanceProperties, TableProperties tableProperties) { return new DynamoDBTransactionLogSnapshotStore(instanceProperties, tableProperties, clients.getDynamoDB(), clients.createHadoopConf(instanceProperties, tableProperties)); } + + private static AllReferencesToAllFiles readFiles(TransactionLogSnapshot snapshot) { + StateStoreFiles state = snapshot.getState(); + return new AllReferencesToAllFiles(state.referencedAndUnreferenced(), false); + } + + private static PartitionTree readPartitions(TransactionLogSnapshot snapshot) { + StateStorePartitions state = snapshot.getState(); + return new PartitionTree(state.all().stream().collect(toUnmodifiableList())); + } } diff --git a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/snapshot/SnapshotsDriver.java b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/snapshot/SnapshotsDriver.java index 142022165c..67edcbf435 100644 --- a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/snapshot/SnapshotsDriver.java +++ b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/snapshot/SnapshotsDriver.java @@ -15,9 +15,10 @@ */ package sleeper.systemtest.dsl.snapshot; +import sleeper.core.partition.PartitionTree; import sleeper.core.properties.instance.InstanceProperties; import sleeper.core.properties.table.TableProperties; -import sleeper.core.statestore.transactionlog.TransactionLogSnapshot; +import sleeper.core.statestore.AllReferencesToAllFiles; import java.util.Optional; @@ -28,13 +29,13 @@ public interface SnapshotsDriver { void disableCreation(InstanceProperties instanceProperties); - default Optional loadLatestFilesSnapshot(InstanceProperties instanceProperties, TableProperties tableProperties) { + default Optional loadLatestFilesSnapshot(InstanceProperties instanceProperties, TableProperties tableProperties) { throw new UnsupportedOperationException( "Requested loading files snapshot for instance " + instanceProperties.get(ID) + ", table " + tableProperties.getStatus() + ", not currently implemented for this driver"); } - default Optional loadLatestPartitionsSnapshot(InstanceProperties instanceProperties, TableProperties tableProperties) { + default Optional loadLatestPartitionsSnapshot(InstanceProperties instanceProperties, TableProperties tableProperties) { throw new UnsupportedOperationException( "Requested loading partitions snapshot for instance " + instanceProperties.get(ID) + ", table " + tableProperties.getStatus() + ", not currently implemented for this driver"); diff --git a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/snapshot/WaitForSnapshot.java b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/snapshot/WaitForSnapshot.java index c2696dde68..eaaf13f08e 100644 --- a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/snapshot/WaitForSnapshot.java +++ b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/snapshot/WaitForSnapshot.java @@ -20,17 +20,12 @@ import sleeper.core.partition.PartitionTree; import sleeper.core.statestore.AllReferencesToAllFiles; -import sleeper.core.statestore.transactionlog.StateStoreFiles; -import sleeper.core.statestore.transactionlog.StateStorePartitions; -import sleeper.core.statestore.transactionlog.TransactionLogSnapshot; import sleeper.core.util.PollWithRetries; import sleeper.systemtest.dsl.instance.SystemTestInstanceContext; import java.util.Optional; import java.util.function.Predicate; -import static java.util.stream.Collectors.toUnmodifiableList; - public class WaitForSnapshot { public static final Logger LOGGER = LoggerFactory.getLogger(WaitForSnapshot.class); @@ -55,33 +50,25 @@ public PartitionTree waitForPartitionsSnapshot(PollWithRetries poll, Predicate

loadLatestFilesSnapshot() { - Optional snapshotOpt = driver.loadLatestFilesSnapshot( + Optional snapshotOpt = driver.loadLatestFilesSnapshot( instance.getInstanceProperties(), instance.getTableProperties()); if (!snapshotOpt.isPresent()) { LOGGER.info("Found no files snapshot"); + } else { + LOGGER.info("Found {} files in snapshot", snapshotOpt.get().getFiles().size()); } - return snapshotOpt.map(WaitForSnapshot::readFiles); + return snapshotOpt; } private Optional loadLatestPartitionsSnapshot() { - Optional snapshotOpt = driver.loadLatestPartitionsSnapshot( + Optional snapshotOpt = driver.loadLatestPartitionsSnapshot( instance.getInstanceProperties(), instance.getTableProperties()); if (!snapshotOpt.isPresent()) { LOGGER.info("Found no partitions snapshot"); + } else { + LOGGER.info("Found {} partitions in snapshot", snapshotOpt.get().getAllPartitions().size()); } - return snapshotOpt.map(WaitForSnapshot::readPartitions); - } - - private static AllReferencesToAllFiles readFiles(TransactionLogSnapshot snapshot) { - StateStoreFiles state = snapshot.getState(); - LOGGER.info("Found {} files in snapshot", state.referencedAndUnreferenced().size()); - return new AllReferencesToAllFiles(state.referencedAndUnreferenced(), false); - } - - private static PartitionTree readPartitions(TransactionLogSnapshot snapshot) { - StateStorePartitions state = snapshot.getState(); - LOGGER.info("Found {} partitions in snapshot", state.all().size()); - return new PartitionTree(state.all().stream().collect(toUnmodifiableList())); + return snapshotOpt; } private static Predicate> matches(Predicate condition) {