Skip to content

Commit

Permalink
Always get latest state in TransactionLogSnapshotCreator
Browse files Browse the repository at this point in the history
  • Loading branch information
kr565370 committed Apr 25, 2024
1 parent ad755e1 commit 5475752
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ private TransactionLogSnapshotUtils() {
}

public static long updateFilesState(
TableStatus table, StateStorePartitions state, TransactionLogStore store, long lastTransactionNumber) throws StateStoreException {
return updateState(TransactionLogHead.builder().forPartitions()
TableStatus table, StateStoreFiles state, TransactionLogStore store, long lastTransactionNumber) throws StateStoreException {
return updateState(TransactionLogHead.builder().forFiles()
.lastTransactionNumber(lastTransactionNumber)
.logStore(store)
.sleeperTable(table)
Expand All @@ -33,8 +33,8 @@ public static long updateFilesState(
}

public static long updatePartitionsState(
TableStatus table, StateStoreFiles state, TransactionLogStore store, long lastTransactionNumber) throws StateStoreException {
return updateState(TransactionLogHead.builder().forFiles()
TableStatus table, StateStorePartitions state, TransactionLogStore store, long lastTransactionNumber) throws StateStoreException {
return updateState(TransactionLogHead.builder().forPartitions()
.lastTransactionNumber(lastTransactionNumber)
.logStore(store)
.sleeperTable(table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package sleeper.statestore.transactionlog;

import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.s3.AmazonS3;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -24,34 +25,36 @@
import sleeper.configuration.properties.table.TableProperties;
import sleeper.configuration.properties.table.TablePropertiesProvider;
import sleeper.configuration.properties.table.TableProperty;
import sleeper.core.statestore.StateStore;
import sleeper.core.statestore.StateStoreException;
import sleeper.core.statestore.transactionlog.StateStoreFiles;
import sleeper.core.statestore.transactionlog.StateStorePartitions;
import sleeper.core.statestore.transactionlog.TransactionLogSnapshotUtils;
import sleeper.core.table.InvokeForTableRequest;
import sleeper.statestore.StateStoreProvider;
import sleeper.statestore.transactionlog.DynamoDBTransactionLogSnapshotStore.LatestSnapshots;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Optional;

import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.DATA_BUCKET;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.TRANSACTION_LOG_FILES_TABLENAME;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.TRANSACTION_LOG_PARTITIONS_TABLENAME;
import static sleeper.configuration.properties.instance.CommonProperty.FILE_SYSTEM;

public class TransactionLogSnapshotCreator {
public static final Logger LOGGER = LoggerFactory.getLogger(TransactionLogSnapshotCreator.class);
private final InstanceProperties instanceProperties;
private final TablePropertiesProvider tablePropertiesProvider;
private final StateStoreProvider stateStoreProvider;
private final AmazonS3 s3Client;
private final AmazonDynamoDB dynamoDB;
private final Configuration configuration;

public TransactionLogSnapshotCreator(
InstanceProperties instanceProperties, TablePropertiesProvider tablePropertiesProvider, StateStoreProvider stateStoreProvider,
AmazonDynamoDB dynamoDB, Configuration configuration) {
InstanceProperties instanceProperties, TablePropertiesProvider tablePropertiesProvider,
AmazonS3 s3Client, AmazonDynamoDB dynamoDB, Configuration configuration) {
this.instanceProperties = instanceProperties;
this.tablePropertiesProvider = tablePropertiesProvider;
this.stateStoreProvider = stateStoreProvider;
this.s3Client = s3Client;
this.dynamoDB = dynamoDB;
this.configuration = configuration;
}
Expand All @@ -66,52 +69,71 @@ public void createSnapshot(TableProperties table) {
LOGGER.info("Creating snapshot for table {}", table.getStatus());
DynamoDBTransactionLogSnapshotStore snapshotStore = new DynamoDBTransactionLogSnapshotStore(instanceProperties, table, dynamoDB);
Optional<LatestSnapshots> latestSnapshotsOpt = snapshotStore.getLatestSnapshots();
StateStore stateStore = stateStoreProvider.getStateStore(table);
TransactionLogSnapshotSerDe snapshotSerDe = new TransactionLogSnapshotSerDe(table.getSchema(), configuration);
StateStoreFiles filesState = latestSnapshotsOpt
.map(latestSnapshot -> {
try {
return snapshotSerDe.loadFiles(latestSnapshot.getFilesSnapshot());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
})
.orElseGet(StateStoreFiles::new);
long filesTransactionNumberBefore = latestSnapshotsOpt
.map(latestSnapshot -> latestSnapshot.getFilesSnapshot().getTransactionNumber())
.orElse(0L);

StateStorePartitions partitionsState = latestSnapshotsOpt
.map(latestSnapshot -> {
try {
return snapshotSerDe.loadPartitions(latestSnapshot.getPartitionsSnapshot());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
})
.orElseGet(StateStorePartitions::new);
long partitionsTransactionNumberBefore = latestSnapshotsOpt
.map(latestSnapshot -> latestSnapshot.getPartitionsSnapshot().getTransactionNumber())
.orElse(0L);
try {
saveFilesSnapshot(table, stateStore, snapshotStore, latestSnapshotsOpt);
savePartitionsSnapshot(table, stateStore, snapshotStore, latestSnapshotsOpt);
saveFilesSnapshot(table, filesState, filesTransactionNumberBefore, snapshotSerDe, snapshotStore);
savePartitionsSnapshot(table, partitionsState, partitionsTransactionNumberBefore, snapshotSerDe, snapshotStore);
} catch (DuplicateSnapshotException | StateStoreException | IOException e) {
LOGGER.error("Failed to create snapshot for table {}", table.getStatus());
throw new RuntimeException(e);
}
}

private void saveFilesSnapshot(TableProperties table, StateStore stateStore,
DynamoDBTransactionLogSnapshotStore snapshotStore, Optional<LatestSnapshots> latestSnapshotsOpt) throws IOException, StateStoreException, DuplicateSnapshotException {
long lastTransactionNumber = 0L;
if (latestSnapshotsOpt.isPresent()) {
lastTransactionNumber = latestSnapshotsOpt.get().getFilesSnapshot().getTransactionNumber();
private void saveFilesSnapshot(TableProperties table, StateStoreFiles filesState, long transactionNumberBefore,
TransactionLogSnapshotSerDe snapshotSerDe, DynamoDBTransactionLogSnapshotStore snapshotStore) throws IOException, StateStoreException, DuplicateSnapshotException {
long transactionNumberAfter = TransactionLogSnapshotUtils.updateFilesState(
table.getStatus(), filesState,
new DynamoDBTransactionLogStore(instanceProperties.get(TRANSACTION_LOG_FILES_TABLENAME),
instanceProperties, table, dynamoDB, s3Client),
transactionNumberBefore);
if (transactionNumberBefore == transactionNumberAfter) {
return;
}
TransactionLogSnapshotSerDe snapshotSerDe = new TransactionLogSnapshotSerDe(table.getSchema(), configuration);
TransactionLogSnapshot snapshot = TransactionLogSnapshot.forFiles(getBasePath(instanceProperties, table), lastTransactionNumber);
snapshotSerDe.saveFiles(snapshot, getFilesState(stateStore));
TransactionLogSnapshot snapshot = TransactionLogSnapshot.forFiles(getBasePath(instanceProperties, table), transactionNumberAfter);
snapshotSerDe.saveFiles(snapshot, filesState);
snapshotStore.saveSnapshot(snapshot);
}

private void savePartitionsSnapshot(TableProperties table, StateStore stateStore,
DynamoDBTransactionLogSnapshotStore snapshotStore, Optional<LatestSnapshots> latestSnapshotsOpt) throws IOException, StateStoreException, DuplicateSnapshotException {
long lastTransactionNumber = 0L;
if (latestSnapshotsOpt.isPresent()) {
lastTransactionNumber = latestSnapshotsOpt.get().getPartitionsSnapshot().getTransactionNumber();
private void savePartitionsSnapshot(TableProperties table, StateStorePartitions partitionsState, long transactionNumberBefore,
TransactionLogSnapshotSerDe snapshotSerDe, DynamoDBTransactionLogSnapshotStore snapshotStore) throws IOException, StateStoreException, DuplicateSnapshotException {
long transactionNumberAfter = TransactionLogSnapshotUtils.updatePartitionsState(
table.getStatus(), partitionsState,
new DynamoDBTransactionLogStore(instanceProperties.get(TRANSACTION_LOG_PARTITIONS_TABLENAME),
instanceProperties, table, dynamoDB, s3Client),
transactionNumberBefore);
if (transactionNumberBefore == transactionNumberAfter) {
return;
}
TransactionLogSnapshotSerDe snapshotSerDe = new TransactionLogSnapshotSerDe(table.getSchema(), configuration);
TransactionLogSnapshot snapshot = TransactionLogSnapshot.forPartitions(getBasePath(instanceProperties, table), lastTransactionNumber);
snapshotSerDe.savePartitions(snapshot, getPartitionsState(stateStore));
TransactionLogSnapshot snapshot = TransactionLogSnapshot.forPartitions(getBasePath(instanceProperties, table), transactionNumberAfter);
snapshotSerDe.savePartitions(snapshot, partitionsState);
snapshotStore.saveSnapshot(snapshot);
}

private StateStoreFiles getFilesState(StateStore store) throws StateStoreException {
StateStoreFiles state = new StateStoreFiles();
store.getAllFilesWithMaxUnreferenced(100).getFiles().forEach(state::add);
return state;
}

private StateStorePartitions getPartitionsState(StateStore store) throws StateStoreException {
StateStorePartitions state = new StateStorePartitions();
store.getAllPartitions().forEach(state::put);
return state;
}

private static String getBasePath(InstanceProperties instanceProperties, TableProperties tableProperties) {
return instanceProperties.get(FILE_SYSTEM)
+ instanceProperties.get(DATA_BUCKET) + "/"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import sleeper.core.statestore.StateStore;
import sleeper.core.statestore.StateStoreException;
import sleeper.core.table.InvokeForTableRequest;
import sleeper.statestore.StateStoreProvider;
import sleeper.statestore.transactionlog.DynamoDBTransactionLogSnapshotStore.LatestSnapshots;

import java.nio.file.Path;
Expand All @@ -54,7 +53,6 @@ public class TransactionLogSnapshotCreatorIT extends TransactionLogStateStoreTes
private final Schema schema = schemaWithKey("key", new LongType());
private final TablePropertiesStore store = InMemoryTableProperties.getStore();
private final TablePropertiesProvider provider = new TablePropertiesProvider(instanceProperties, store, Instant::now);
private final StateStoreProvider stateStoreProvider = new StateStoreProvider(instanceProperties, s3Client, dynamoDBClient, new Configuration());

@BeforeEach
public void setup() {
Expand All @@ -77,8 +75,8 @@ void shouldCreateSnapshotsForOneTable() throws Exception {
// Then
assertThat(snapshotStore(table).getLatestSnapshots())
.contains(new LatestSnapshots(
filesSnapshot(table, 0),
partitionsSnapshot(table, 0)));
filesSnapshot(table, 1),
partitionsSnapshot(table, 1)));
}

@Test
Expand All @@ -102,17 +100,37 @@ void shouldCreateSnapshotsForMultipleTables() throws Exception {
// Then
assertThat(snapshotStore(table1).getLatestSnapshots())
.contains(new LatestSnapshots(
filesSnapshot(table1, 0),
partitionsSnapshot(table1, 0)));
filesSnapshot(table1, 1),
partitionsSnapshot(table1, 1)));
assertThat(snapshotStore(table2).getLatestSnapshots())
.contains(new LatestSnapshots(
filesSnapshot(table2, 0),
partitionsSnapshot(table2, 0)));
filesSnapshot(table2, 1),
partitionsSnapshot(table2, 1)));
}

@Test
void shouldSkipCreatingSnapshotsIfStateHasNotUpdatedSinceLastSnapshot() throws Exception {
// Given
TableProperties table = createTable("test-table-id-1", "test-table-1");
StateStore stateStore = createStateStore(table);
stateStore.initialise();
FileReferenceFactory factory = FileReferenceFactory.from(stateStore);
stateStore.addFile(factory.rootFile(123L));
runSnapshotCreator(forTableIds("test-table-id-1"));

// When
runSnapshotCreator(forTableIds("test-table-id-1"));

// Then
assertThat(snapshotStore(table).getLatestSnapshots())
.contains(new LatestSnapshots(
filesSnapshot(table, 1),
partitionsSnapshot(table, 1)));
}

private void runSnapshotCreator(InvokeForTableRequest tableRequest) throws StateStoreException {
new TransactionLogSnapshotCreator(
instanceProperties, provider, stateStoreProvider, dynamoDBClient, new Configuration())
instanceProperties, provider, s3Client, dynamoDBClient, new Configuration())
.run(tableRequest);
}

Expand Down

0 comments on commit 5475752

Please sign in to comment.