Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 2271 - Load latest snapshot when first using transaction log state store #2355

Merged
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
7285faa
Start adding tests
kr565370 Apr 22, 2024
475419b
Merge branch '2272-track-transactionlog-snapshots-in-dynamodb' into 2…
kr565370 Apr 22, 2024
54e69f1
Merge branch '2272-track-transactionlog-snapshots-in-dynamodb' into 2…
kr565370 Apr 22, 2024
e0ecc74
Merge branch '2272-track-transactionlog-snapshots-in-dynamodb' into 2…
kr565370 Apr 22, 2024
ed60b52
Create test for loading latest snapshot when creating state store object
kr565370 Apr 22, 2024
a296f6e
Fix failing test, use snapshot loading code in every test
kr565370 Apr 23, 2024
bc42cc0
Use Optional.ifPresent in DynamoDBTransactionLogStateStore.builderFrom
kr565370 Apr 23, 2024
25da78e
Move serde definitions into ifPresent block
kr565370 Apr 23, 2024
2f7a3d6
Add property for enabling loading of latest snapshots
kr565370 Apr 23, 2024
54006ba
Use delegating builder method
kr565370 Apr 23, 2024
a9f4c07
Create snapshot tables in CDK
kr565370 Apr 23, 2024
5d692e6
Merge branch '2272-track-transactionlog-snapshots-in-dynamodb' into 2…
kr565370 Apr 23, 2024
ed79a82
Merge branch 'develop' into 2271-load-latest-snapshot-when-first-usin…
patchwork01 Apr 24, 2024
f18460a
Rename tables created by CDK
kr565370 Apr 24, 2024
274b133
Move call to create snapshot store inside state store creator
kr565370 Apr 24, 2024
1dd1505
Remove old static builder method
kr565370 Apr 24, 2024
dd0332b
Add test for skipping previous transactions when loading latest snaps…
kr565370 Apr 24, 2024
df2105f
Stop wrapping IOException in StateStoreFileUtils
kr565370 Apr 24, 2024
9a2c2f7
Combine snapshot serde classes. Pass snapshot objects to store and se…
kr565370 Apr 24, 2024
d74ae40
Keep names of transaction log properties and tables consistent
kr565370 Apr 24, 2024
410e52c
Convert load latest snapshots property to table property
kr565370 Apr 24, 2024
1099ec5
Add tests for loading files snapshot when partitions snapshot is not …
kr565370 Apr 25, 2024
195d7e6
Merge branch 'develop' into 2271-load-latest-snapshot-when-first-usin…
gaffer01 Apr 26, 2024
d4d1b08
Hold optional snapshots inside LatestSnapshot
kr565370 Apr 26, 2024
2d6a866
Revert "Hold optional snapshots inside LatestSnapshot"
kr565370 Apr 26, 2024
d583ed5
Merge branch 'develop' into 2271-load-latest-snapshot-when-first-usin…
kr565370 Apr 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions example/full/instance.properties
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ sleeper.metadata.dynamo.pointintimerecovery=false
# revision DynamoDB table.
sleeper.metadata.s3.dynamo.pointintimerecovery=false

# If set, transaction log state stores will load the latest snapshot from the snapshot store when
# created.
sleeper.metadata.transactionlog.load.latest.snapshots=true

# This specifies whether point in time recovery is enabled for the Sleeper table index. This is set on
# the DynamoDB tables.
sleeper.tables.index.dynamo.pointintimerecovery=false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,34 @@
import software.constructs.Construct;

import sleeper.configuration.properties.instance.InstanceProperties;
import sleeper.statestore.transactionlog.DynamoDBTransactionLogSnapshotStore;
import sleeper.statestore.transactionlog.DynamoDBTransactionLogStateStore;

import static sleeper.cdk.Utils.removalPolicy;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.FILE_TRANSACTION_LOG_TABLENAME;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.PARTITION_TRANSACTION_LOG_TABLENAME;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.TRANSACTION_LOG_ALL_SNAPSHOTS_TABLENAME;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.TRANSACTION_LOG_LATEST_SNAPSHOTS_TABLENAME;
import static sleeper.configuration.properties.instance.CommonProperty.ID;

public class TransactionLogStateStoreStack extends NestedStack {
private final Table partitionsLogTable;
private final Table filesLogTable;
private final Table latestSnapshotsTable;
private final Table allSnapshotsTable;

public TransactionLogStateStoreStack(
Construct scope, String id, InstanceProperties instanceProperties) {
super(scope, id);

partitionsLogTable = createTransactionLogTable(instanceProperties, "PartitionTransactionLogTable", "partition-transaction-log");
filesLogTable = createTransactionLogTable(instanceProperties, "FileTransactionLogTable", "file-transaction-log");
latestSnapshotsTable = createLatestSnapshotsTable(instanceProperties, "TransactionLogLatestSnapshotsTable", "transaction-log-latest-snapshots");
allSnapshotsTable = createAllSnapshotsTable(instanceProperties, "TransactionLogAllSnapshotsTable", "transaction-log-all-snapshots");
instanceProperties.set(PARTITION_TRANSACTION_LOG_TABLENAME, partitionsLogTable.getTableName());
instanceProperties.set(FILE_TRANSACTION_LOG_TABLENAME, filesLogTable.getTableName());
instanceProperties.set(TRANSACTION_LOG_LATEST_SNAPSHOTS_TABLENAME, latestSnapshotsTable.getTableName());
instanceProperties.set(TRANSACTION_LOG_ALL_SNAPSHOTS_TABLENAME, allSnapshotsTable.getTableName());
patchwork01 marked this conversation as resolved.
Show resolved Hide resolved
}

private Table createTransactionLogTable(InstanceProperties instanceProperties, String id, String name) {
Expand All @@ -62,6 +71,36 @@ private Table createTransactionLogTable(InstanceProperties instanceProperties, S
.build();
}

private Table createLatestSnapshotsTable(InstanceProperties instanceProperties, String id, String name) {
return Table.Builder
.create(this, id)
.tableName(String.join("-", "sleeper", instanceProperties.get(ID), name))
.removalPolicy(removalPolicy(instanceProperties))
.billingMode(BillingMode.PAY_PER_REQUEST)
.partitionKey(Attribute.builder()
.name(DynamoDBTransactionLogSnapshotStore.TABLE_ID)
.type(AttributeType.STRING)
.build())
.build();
}

private Table createAllSnapshotsTable(InstanceProperties instanceProperties, String id, String name) {
return Table.Builder
.create(this, id)
.tableName(String.join("-", "sleeper", instanceProperties.get(ID), name))
.removalPolicy(removalPolicy(instanceProperties))
.billingMode(BillingMode.PAY_PER_REQUEST)
.partitionKey(Attribute.builder()
.name(DynamoDBTransactionLogSnapshotStore.TABLE_ID_AND_SNAPSHOT_TYPE)
.type(AttributeType.STRING)
.build())
.sortKey(Attribute.builder()
.name(DynamoDBTransactionLogSnapshotStore.TRANSACTION_NUMBER)
.type(AttributeType.NUMBER)
.build())
.build();
}

public void grantReadFiles(IGrantable grantee) {
filesLogTable.grantReadData(grantee);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,12 @@ public interface CommonProperty {
.validationPredicate(Utils::isTrueOrFalse)
.propertyGroup(InstancePropertyGroup.COMMON)
.runCdkDeployWhenChanged(true).build();
UserDefinedInstanceProperty TRANSACTION_LOG_STATE_STORE_LOAD_LATEST_SNAPSHOTS = Index.propertyBuilder("sleeper.metadata.transactionlog.load.latest.snapshots")
.description("If set, transaction log state stores will load the latest snapshot from the snapshot store when created.")
.defaultValue("true")
.validationPredicate(Utils::isTrueOrFalse)
.propertyGroup(InstancePropertyGroup.COMMON)
.build();
patchwork01 marked this conversation as resolved.
Show resolved Hide resolved
UserDefinedInstanceProperty TABLE_INDEX_DYNAMO_POINT_IN_TIME_RECOVERY = Index.propertyBuilder("sleeper.tables.index.dynamo.pointintimerecovery")
.description("This specifies whether point in time recovery is enabled for the Sleeper table index. " +
"This is set on the DynamoDB tables.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public StateStore getStateStore(TableProperties tableProperties) {
return new S3StateStore(instanceProperties, tableProperties, dynamoDB, configuration);
}
if (stateStoreClassName.equals(DynamoDBTransactionLogStateStore.class.getName())) {
return new DynamoDBTransactionLogStateStore(instanceProperties, tableProperties, dynamoDB, s3);
return new DynamoDBTransactionLogStateStore(instanceProperties, tableProperties, dynamoDB, s3, configuration);
}
throw new RuntimeException("Unknown StateStore class: " + stateStoreClassName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,97 +32,81 @@
import sleeper.core.statestore.AllReferencesToAFile;
import sleeper.core.statestore.FileReference;
import sleeper.core.statestore.FileReferenceSerDe;
import sleeper.core.statestore.StateStoreException;
import sleeper.core.statestore.transactionlog.StateStoreFiles;
import sleeper.core.statestore.transactionlog.StateStorePartitions;
import sleeper.io.parquet.record.ParquetReaderIterator;
import sleeper.io.parquet.record.ParquetRecordReader;
import sleeper.io.parquet.record.ParquetRecordWriterFactory;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Stream;

public class StateStoreFileUtils {
private static final Schema FILES_SCHEMA = initialiseFilesSchema();
private static final Schema PARTITIONS_SCHEMA = initialisePartitionSchema();
private final FileReferenceSerDe serDe = new FileReferenceSerDe();
private final Configuration configuration;
private final Schema schema;

public static StateStoreFileUtils forFiles(Configuration configuration) {
return new StateStoreFileUtils(initialiseFilesSchema(), configuration);
}

public static StateStoreFileUtils forPartitions(Configuration configuration) {
return new StateStoreFileUtils(initialisePartitionSchema(), configuration);
}

private StateStoreFileUtils(Schema schema, Configuration configuration) {
this.schema = schema;
public StateStoreFileUtils(Configuration configuration) {
this.configuration = configuration;
}

public void savePartitions(String path, StateStorePartitions partitions, Schema sleeperSchema) throws StateStoreException {
public void savePartitions(String path, StateStorePartitions partitions, Schema sleeperSchema) throws IOException {
savePartitions(path, partitions.all(), sleeperSchema);
}

public void savePartitions(String path, Collection<Partition> partitions, Schema sleeperSchema) throws StateStoreException {
public void savePartitions(String path, Collection<Partition> partitions, Schema sleeperSchema) throws IOException {
RegionSerDe regionSerDe = new RegionSerDe(sleeperSchema);
save(path, partitions.stream().map(partition -> getRecordFromPartition(partition, regionSerDe)));
save(PARTITIONS_SCHEMA, path, partitions.stream().map(partition -> getRecordFromPartition(partition, regionSerDe)));
}

public void saveFiles(String path, StateStoreFiles files) throws StateStoreException {
public void saveFiles(String path, StateStoreFiles files) throws IOException {
saveFiles(path, files.referencedAndUnreferenced());
}

public void saveFiles(String path, Stream<AllReferencesToAFile> files) throws StateStoreException {
save(path, files.map(this::getRecordFromFile));
public void saveFiles(String path, Stream<AllReferencesToAFile> files) throws IOException {
save(FILES_SCHEMA, path, files.map(this::getRecordFromFile));
}

private void save(String path, Stream<Record> records) throws StateStoreException {
private void save(Schema schema, String path, Stream<Record> records) throws IOException {
try (ParquetWriter<Record> recordWriter = ParquetRecordWriterFactory.createParquetRecordWriter(
new Path(path), schema, configuration)) {
for (Record record : (Iterable<Record>) () -> records.iterator()) {
recordWriter.write(record);
}
} catch (IOException e) {
throw new UncheckedIOException("Failed writing records", e);
}
}

public void loadPartitions(String path, Schema sleeperSchema, Consumer<Partition> partitionConsumer) throws StateStoreException {
public void loadPartitions(String path, Schema sleeperSchema, Consumer<Partition> partitionConsumer) throws IOException {
RegionSerDe regionSerDe = new RegionSerDe(sleeperSchema);
load(path, record -> partitionConsumer.accept(getPartitionFromRecord(record, regionSerDe)));
load(PARTITIONS_SCHEMA, path, record -> partitionConsumer.accept(getPartitionFromRecord(record, regionSerDe)));
}

public void loadFiles(String path, Consumer<AllReferencesToAFile> fileConsumer) throws StateStoreException {
load(path, record -> fileConsumer.accept(getFileFromRecord(record)));
public void loadFiles(String path, Consumer<AllReferencesToAFile> fileConsumer) throws IOException {
load(FILES_SCHEMA, path, record -> fileConsumer.accept(getFileFromRecord(record)));
}

private void load(String path, Consumer<Record> recordConsumer) throws StateStoreException {
private void load(Schema schema, String path, Consumer<Record> recordConsumer) throws IOException {
try (ParquetReader<Record> reader = new ParquetRecordReader.Builder(new Path(path), schema)
.withConf(configuration).build();
ParquetReaderIterator recordReader = new ParquetReaderIterator(reader)) {
while (recordReader.hasNext()) {
recordConsumer.accept(recordReader.next());
}
} catch (IOException e) {
throw new StateStoreException("Failed reading records", e);
}
}

public boolean isEmpty(String path) throws StateStoreException {
try (ParquetReader<Record> reader = new ParquetRecordReader.Builder(new Path(path), schema)
public boolean isEmpty(String path) throws IOException {
try (ParquetReader<Record> reader = new ParquetRecordReader.Builder(new Path(path), FILES_SCHEMA)
.withConf(configuration).build();
ParquetReaderIterator recordReader = new ParquetReaderIterator(reader)) {
if (recordReader.hasNext()) {
return false;
}
} catch (IOException e) {
throw new StateStoreException("Failed reading records", e);
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ private S3FileReferenceStore(Builder builder) {
.loadAndWriteData(this::readFilesFromParquet, this::writeFilesToParquet)
.hadoopConf(conf)
.build();
stateStoreFileUtils = StateStoreFileUtils.forFiles(conf);
stateStoreFileUtils = new StateStoreFileUtils(conf);
}

static Builder builder() {
Expand Down Expand Up @@ -377,7 +377,11 @@ public boolean hasNoFiles() throws StateStoreException {
if (revisionId == null) {
return true;
}
return stateStoreFileUtils.isEmpty(getFilesPath(revisionId));
try {
return stateStoreFileUtils.isEmpty(getFilesPath(revisionId));
} catch (IOException e) {
throw new StateStoreException("Failed to load files", e);
}
}

@Override
Expand All @@ -397,14 +401,22 @@ private String getFilesPath(S3RevisionId revisionId) {

private void writeFilesToParquet(List<AllReferencesToAFile> files, String path) throws StateStoreException {
LOGGER.debug("Writing {} file records to {}", files.size(), path);
stateStoreFileUtils.saveFiles(path, files.stream());
try {
stateStoreFileUtils.saveFiles(path, files.stream());
} catch (IOException e) {
throw new StateStoreException("Failed to save files", e);
}
LOGGER.debug("Wrote {} file records to {}", files.size(), path);
}

private List<AllReferencesToAFile> readFilesFromParquet(String path) throws StateStoreException {
LOGGER.debug("Loading file records from {}", path);
List<AllReferencesToAFile> files = new ArrayList<>();
stateStoreFileUtils.loadFiles(path, files::add);
try {
stateStoreFileUtils.loadFiles(path, files::add);
} catch (IOException e) {
throw new StateStoreException("Failed to load files", e);
}
LOGGER.debug("Loaded {} file records from {}", files.size(), path);
return files;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ private S3PartitionStore(Builder builder) {
.loadAndWriteData(this::readPartitionsMapFromParquet, this::writePartitionsMapToParquet)
.hadoopConf(conf)
.build();
stateStoreFileUtils = StateStoreFileUtils.forPartitions(conf);
stateStoreFileUtils = new StateStoreFileUtils(conf);
}

public static Builder builder() {
Expand Down Expand Up @@ -204,14 +204,22 @@ private Map<String, Partition> getMapFromPartitionIdToPartition(List<Partition>

private void writePartitionsToParquet(Collection<Partition> partitions, String path) throws StateStoreException {
LOGGER.debug("Writing {} partitions to {}", partitions.size(), path);
stateStoreFileUtils.savePartitions(path, partitions, tableSchema);
try {
stateStoreFileUtils.savePartitions(path, partitions, tableSchema);
} catch (IOException e) {
throw new StateStoreException("Failed to save partitions", e);
}
LOGGER.debug("Wrote {} partitions to {}", partitions.size(), path);
}

private List<Partition> readPartitionsFromParquet(String path) throws StateStoreException {
LOGGER.debug("Loading partitions from {}", path);
List<Partition> partitions = new ArrayList<>();
stateStoreFileUtils.loadPartitions(path, tableSchema, partitions::add);
try {
stateStoreFileUtils.loadPartitions(path, tableSchema, partitions::add);
} catch (IOException e) {
throw new StateStoreException("Failed to load partitions", e);
}
LOGGER.debug("Loaded {} partitions from {}", partitions.size(), path);
return partitions;
}
Expand Down
Loading
Loading