Skip to content

Commit

Permalink
Move reading snapshots to driver
Browse files Browse the repository at this point in the history
  • Loading branch information
patchwork01 committed Oct 23, 2024
1 parent d76e0cf commit b4bf165
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,20 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import sleeper.core.partition.PartitionTree;
import sleeper.core.properties.instance.InstanceProperties;
import sleeper.core.properties.table.TableProperties;
import sleeper.core.statestore.AllReferencesToAllFiles;
import sleeper.core.statestore.transactionlog.StateStoreFiles;
import sleeper.core.statestore.transactionlog.StateStorePartitions;
import sleeper.core.statestore.transactionlog.TransactionLogSnapshot;
import sleeper.statestore.transactionlog.DynamoDBTransactionLogSnapshotStore;
import sleeper.systemtest.drivers.util.SystemTestClients;
import sleeper.systemtest.dsl.snapshot.SnapshotsDriver;

import java.util.Optional;

import static java.util.stream.Collectors.toUnmodifiableList;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.TRANSACTION_LOG_SNAPSHOT_CREATION_RULE;

public class AwsSnapshotsDriver implements SnapshotsDriver {
Expand All @@ -52,17 +57,31 @@ public void disableCreation(InstanceProperties instanceProperties) {
}

@Override
public Optional<TransactionLogSnapshot> loadLatestFilesSnapshot(InstanceProperties instanceProperties, TableProperties tableProperties) {
return snapshotStore(instanceProperties, tableProperties).loadLatestFilesSnapshotIfAtMinimumTransaction(0);
public Optional<AllReferencesToAllFiles> loadLatestFilesSnapshot(InstanceProperties instanceProperties, TableProperties tableProperties) {
return snapshotStore(instanceProperties, tableProperties)
.loadLatestFilesSnapshotIfAtMinimumTransaction(0)
.map(AwsSnapshotsDriver::readFiles);
}

@Override
public Optional<TransactionLogSnapshot> loadLatestPartitionsSnapshot(InstanceProperties instanceProperties, TableProperties tableProperties) {
return snapshotStore(instanceProperties, tableProperties).loadLatestPartitionsSnapshotIfAtMinimumTransaction(0);
public Optional<PartitionTree> loadLatestPartitionsSnapshot(InstanceProperties instanceProperties, TableProperties tableProperties) {
return snapshotStore(instanceProperties, tableProperties)
.loadLatestPartitionsSnapshotIfAtMinimumTransaction(0)
.map(AwsSnapshotsDriver::readPartitions);
}

private DynamoDBTransactionLogSnapshotStore snapshotStore(InstanceProperties instanceProperties, TableProperties tableProperties) {
return new DynamoDBTransactionLogSnapshotStore(instanceProperties, tableProperties,
clients.getDynamoDB(), clients.createHadoopConf(instanceProperties, tableProperties));
}

private static AllReferencesToAllFiles readFiles(TransactionLogSnapshot snapshot) {
StateStoreFiles state = snapshot.getState();
return new AllReferencesToAllFiles(state.referencedAndUnreferenced(), false);
}

private static PartitionTree readPartitions(TransactionLogSnapshot snapshot) {
StateStorePartitions state = snapshot.getState();
return new PartitionTree(state.all().stream().collect(toUnmodifiableList()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
*/
package sleeper.systemtest.dsl.snapshot;

import sleeper.core.partition.PartitionTree;
import sleeper.core.properties.instance.InstanceProperties;
import sleeper.core.properties.table.TableProperties;
import sleeper.core.statestore.transactionlog.TransactionLogSnapshot;
import sleeper.core.statestore.AllReferencesToAllFiles;

import java.util.Optional;

Expand All @@ -28,13 +29,13 @@ public interface SnapshotsDriver {

void disableCreation(InstanceProperties instanceProperties);

default Optional<TransactionLogSnapshot> loadLatestFilesSnapshot(InstanceProperties instanceProperties, TableProperties tableProperties) {
default Optional<AllReferencesToAllFiles> loadLatestFilesSnapshot(InstanceProperties instanceProperties, TableProperties tableProperties) {
throw new UnsupportedOperationException(
"Requested loading files snapshot for instance " + instanceProperties.get(ID) +
", table " + tableProperties.getStatus() + ", not currently implemented for this driver");
}

default Optional<TransactionLogSnapshot> loadLatestPartitionsSnapshot(InstanceProperties instanceProperties, TableProperties tableProperties) {
default Optional<PartitionTree> loadLatestPartitionsSnapshot(InstanceProperties instanceProperties, TableProperties tableProperties) {
throw new UnsupportedOperationException(
"Requested loading partitions snapshot for instance " + instanceProperties.get(ID) +
", table " + tableProperties.getStatus() + ", not currently implemented for this driver");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,12 @@

import sleeper.core.partition.PartitionTree;
import sleeper.core.statestore.AllReferencesToAllFiles;
import sleeper.core.statestore.transactionlog.StateStoreFiles;
import sleeper.core.statestore.transactionlog.StateStorePartitions;
import sleeper.core.statestore.transactionlog.TransactionLogSnapshot;
import sleeper.core.util.PollWithRetries;
import sleeper.systemtest.dsl.instance.SystemTestInstanceContext;

import java.util.Optional;
import java.util.function.Predicate;

import static java.util.stream.Collectors.toUnmodifiableList;

public class WaitForSnapshot {
public static final Logger LOGGER = LoggerFactory.getLogger(WaitForSnapshot.class);

Expand All @@ -55,33 +50,25 @@ public PartitionTree waitForPartitionsSnapshot(PollWithRetries poll, Predicate<P
}

private Optional<AllReferencesToAllFiles> loadLatestFilesSnapshot() {
Optional<TransactionLogSnapshot> snapshotOpt = driver.loadLatestFilesSnapshot(
Optional<AllReferencesToAllFiles> snapshotOpt = driver.loadLatestFilesSnapshot(
instance.getInstanceProperties(), instance.getTableProperties());
if (!snapshotOpt.isPresent()) {
LOGGER.info("Found no files snapshot");
} else {
LOGGER.info("Found {} files in snapshot", snapshotOpt.get().getFiles().size());
}
return snapshotOpt.map(WaitForSnapshot::readFiles);
return snapshotOpt;
}

private Optional<PartitionTree> loadLatestPartitionsSnapshot() {
Optional<TransactionLogSnapshot> snapshotOpt = driver.loadLatestPartitionsSnapshot(
Optional<PartitionTree> snapshotOpt = driver.loadLatestPartitionsSnapshot(
instance.getInstanceProperties(), instance.getTableProperties());
if (!snapshotOpt.isPresent()) {
LOGGER.info("Found no partitions snapshot");
} else {
LOGGER.info("Found {} partitions in snapshot", snapshotOpt.get().getAllPartitions().size());
}
return snapshotOpt.map(WaitForSnapshot::readPartitions);
}

private static AllReferencesToAllFiles readFiles(TransactionLogSnapshot snapshot) {
StateStoreFiles state = snapshot.getState();
LOGGER.info("Found {} files in snapshot", state.referencedAndUnreferenced().size());
return new AllReferencesToAllFiles(state.referencedAndUnreferenced(), false);
}

private static PartitionTree readPartitions(TransactionLogSnapshot snapshot) {
StateStorePartitions state = snapshot.getState();
LOGGER.info("Found {} partitions in snapshot", state.all().size());
return new PartitionTree(state.all().stream().collect(toUnmodifiableList()));
return snapshotOpt;
}

private static <T> Predicate<Optional<T>> matches(Predicate<T> condition) {
Expand Down

0 comments on commit b4bf165

Please sign in to comment.