Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 2271 - Load latest snapshot when first using transaction log state store #2355

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
7285faa
Start adding tests
kr565370 Apr 22, 2024
475419b
Merge branch '2272-track-transactionlog-snapshots-in-dynamodb' into 2…
kr565370 Apr 22, 2024
54e69f1
Merge branch '2272-track-transactionlog-snapshots-in-dynamodb' into 2…
kr565370 Apr 22, 2024
e0ecc74
Merge branch '2272-track-transactionlog-snapshots-in-dynamodb' into 2…
kr565370 Apr 22, 2024
ed60b52
Create test for loading latest snapshot when creating state store object
kr565370 Apr 22, 2024
a296f6e
Fix failing test, use snapshot loading code in every test
kr565370 Apr 23, 2024
bc42cc0
Use Optional.ifPresent in DynamoDBTransactionLogStateStore.builderFrom
kr565370 Apr 23, 2024
25da78e
Move serde definitions into ifPresent block
kr565370 Apr 23, 2024
2f7a3d6
Add property for enabling loading of latest snapshots
kr565370 Apr 23, 2024
54006ba
Use delegating builder method
kr565370 Apr 23, 2024
a9f4c07
Create snapshot tables in CDK
kr565370 Apr 23, 2024
5d692e6
Merge branch '2272-track-transactionlog-snapshots-in-dynamodb' into 2…
kr565370 Apr 23, 2024
ed79a82
Merge branch 'develop' into 2271-load-latest-snapshot-when-first-usin…
patchwork01 Apr 24, 2024
f18460a
Rename tables created by CDK
kr565370 Apr 24, 2024
274b133
Move call to create snapshot store inside state store creator
kr565370 Apr 24, 2024
1dd1505
Remove old static builder method
kr565370 Apr 24, 2024
dd0332b
Add test for skipping previous transactions when loading latest snaps…
kr565370 Apr 24, 2024
df2105f
Stop wrapping IOException in StateStoreFileUtils
kr565370 Apr 24, 2024
9a2c2f7
Combine snapshot serde classes. Pass snapshot objects to store and se…
kr565370 Apr 24, 2024
d74ae40
Keep names of transaction log properties and tables consistent
kr565370 Apr 24, 2024
410e52c
Convert load latest snapshots property to table property
kr565370 Apr 24, 2024
1099ec5
Add tests for loading files snapshot when partitions snapshot is not …
kr565370 Apr 25, 2024
195d7e6
Merge branch 'develop' into 2271-load-latest-snapshot-when-first-usin…
gaffer01 Apr 26, 2024
d4d1b08
Hold optional snapshots inside LatestSnapshot
kr565370 Apr 26, 2024
2d6a866
Revert "Hold optional snapshots inside LatestSnapshot"
kr565370 Apr 26, 2024
d583ed5
Merge branch 'develop' into 2271-load-latest-snapshot-when-first-usin…
kr565370 Apr 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions example/full/instance.properties
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,10 @@ sleeper.default.parquet.writer.version=v2
# strongly consistent. This default can be overridden by a table property.
sleeper.default.table.dynamo.strongly.consistent.reads=false

# If set, transaction log state stores will load the latest snapshot from the snapshot store when
# created.
sleeper.default.metadata.transactionlog.load.latest.snapshots=true

# Specifies the minimum number of leaf partitions that are needed to run a bulk import job. If this
# minimum has not been reached, bulk import jobs will refuse to start.
sleeper.default.bulk.import.min.leaf.partitions=64
Expand Down
4 changes: 4 additions & 0 deletions example/full/table.properties
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ sleeper.table.statestore.classname=sleeper.statestore.s3.S3StateStore
# strongly consistent.
sleeper.table.metadata.dynamo.consistent.reads=false

# If set, the transaction log state store will load the latest snapshot from the snapshot store when
# created.
sleeper.table.metadata.transactionlog.load.latest.snapshots=true


## The following table properties relate to ingest.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,34 @@
import software.constructs.Construct;

import sleeper.configuration.properties.instance.InstanceProperties;
import sleeper.statestore.transactionlog.DynamoDBTransactionLogSnapshotStore;
import sleeper.statestore.transactionlog.DynamoDBTransactionLogStateStore;

import static sleeper.cdk.Utils.removalPolicy;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.FILE_TRANSACTION_LOG_TABLENAME;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.PARTITION_TRANSACTION_LOG_TABLENAME;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.TRANSACTION_LOG_ALL_SNAPSHOTS_TABLENAME;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.TRANSACTION_LOG_FILES_TABLENAME;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.TRANSACTION_LOG_LATEST_SNAPSHOTS_TABLENAME;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.TRANSACTION_LOG_PARTITIONS_TABLENAME;
import static sleeper.configuration.properties.instance.CommonProperty.ID;

public class TransactionLogStateStoreStack extends NestedStack {
private final Table partitionsLogTable;
private final Table filesLogTable;
private final Table latestSnapshotsTable;
private final Table allSnapshotsTable;

public TransactionLogStateStoreStack(
Construct scope, String id, InstanceProperties instanceProperties) {
super(scope, id);

partitionsLogTable = createTransactionLogTable(instanceProperties, "PartitionTransactionLogTable", "partition-transaction-log");
filesLogTable = createTransactionLogTable(instanceProperties, "FileTransactionLogTable", "file-transaction-log");
instanceProperties.set(PARTITION_TRANSACTION_LOG_TABLENAME, partitionsLogTable.getTableName());
instanceProperties.set(FILE_TRANSACTION_LOG_TABLENAME, filesLogTable.getTableName());
partitionsLogTable = createTransactionLogTable(instanceProperties, "TransactionLogPartitionsTable", "transaction-log-partitions");
filesLogTable = createTransactionLogTable(instanceProperties, "TransactionLogFilesTable", "transaction-log-files");
latestSnapshotsTable = createLatestSnapshotsTable(instanceProperties, "TransactionLogLatestSnapshotsTable", "transaction-log-latest-snapshots");
allSnapshotsTable = createAllSnapshotsTable(instanceProperties, "TransactionLogAllSnapshotsTable", "transaction-log-all-snapshots");
instanceProperties.set(TRANSACTION_LOG_PARTITIONS_TABLENAME, partitionsLogTable.getTableName());
instanceProperties.set(TRANSACTION_LOG_FILES_TABLENAME, filesLogTable.getTableName());
instanceProperties.set(TRANSACTION_LOG_LATEST_SNAPSHOTS_TABLENAME, latestSnapshotsTable.getTableName());
instanceProperties.set(TRANSACTION_LOG_ALL_SNAPSHOTS_TABLENAME, allSnapshotsTable.getTableName());
patchwork01 marked this conversation as resolved.
Show resolved Hide resolved
}

private Table createTransactionLogTable(InstanceProperties instanceProperties, String id, String name) {
Expand All @@ -62,6 +71,36 @@ private Table createTransactionLogTable(InstanceProperties instanceProperties, S
.build();
}

private Table createLatestSnapshotsTable(InstanceProperties instanceProperties, String id, String name) {
return Table.Builder
.create(this, id)
.tableName(String.join("-", "sleeper", instanceProperties.get(ID), name))
.removalPolicy(removalPolicy(instanceProperties))
.billingMode(BillingMode.PAY_PER_REQUEST)
.partitionKey(Attribute.builder()
.name(DynamoDBTransactionLogSnapshotStore.TABLE_ID)
.type(AttributeType.STRING)
.build())
.build();
}

private Table createAllSnapshotsTable(InstanceProperties instanceProperties, String id, String name) {
return Table.Builder
.create(this, id)
.tableName(String.join("-", "sleeper", instanceProperties.get(ID), name))
.removalPolicy(removalPolicy(instanceProperties))
.billingMode(BillingMode.PAY_PER_REQUEST)
.partitionKey(Attribute.builder()
.name(DynamoDBTransactionLogSnapshotStore.TABLE_ID_AND_SNAPSHOT_TYPE)
.type(AttributeType.STRING)
.build())
.sortKey(Attribute.builder()
.name(DynamoDBTransactionLogSnapshotStore.TRANSACTION_NUMBER)
.type(AttributeType.NUMBER)
.build())
.build();
}

public void grantReadFiles(IGrantable grantee) {
filesLogTable.grantReadData(grantee);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ public interface CdkDefinedInstanceProperty extends InstanceProperty {
.build();

// TransactionLogStateStore
CdkDefinedInstanceProperty FILE_TRANSACTION_LOG_TABLENAME = Index.propertyBuilder("sleeper.metadata.transactionlog.dynamo.file.log.table")
CdkDefinedInstanceProperty TRANSACTION_LOG_FILES_TABLENAME = Index.propertyBuilder("sleeper.metadata.transactionlog.dynamo.file.log.table")
.description("The name of the DynamoDB table holding the state store file transaction log.")
.propertyGroup(InstancePropertyGroup.COMMON)
.build();
CdkDefinedInstanceProperty PARTITION_TRANSACTION_LOG_TABLENAME = Index.propertyBuilder("sleeper.metadata.transactionlog.dynamo.partition.log.table")
CdkDefinedInstanceProperty TRANSACTION_LOG_PARTITIONS_TABLENAME = Index.propertyBuilder("sleeper.metadata.transactionlog.dynamo.partition.log.table")
.description("The name of the DynamoDB table holding the state store partition transaction log.")
.propertyGroup(InstancePropertyGroup.COMMON)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ public interface DefaultProperty {
.defaultValue("false")
.validationPredicate(Utils::isTrueOrFalse)
.propertyGroup(InstancePropertyGroup.DEFAULT).build();
UserDefinedInstanceProperty DEFAULT_TRANSACTION_LOG_LOAD_LATEST_SNAPSHOTS = Index.propertyBuilder("sleeper.default.metadata.transactionlog.load.latest.snapshots")
.description("If set, transaction log state stores will load the latest snapshot from the snapshot store when created.")
.defaultValue("true")
.validationPredicate(Utils::isTrueOrFalse)
.propertyGroup(InstancePropertyGroup.DEFAULT)
.build();
UserDefinedInstanceProperty DEFAULT_BULK_IMPORT_MIN_LEAF_PARTITION_COUNT = Index.propertyBuilder("sleeper.default.bulk.import.min.leaf.partitions")
.description("Specifies the minimum number of leaf partitions that are needed to run a bulk import job. " +
"If this minimum has not been reached, bulk import jobs will refuse to start.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import static sleeper.configuration.properties.instance.DefaultProperty.DEFAULT_ROW_GROUP_SIZE;
import static sleeper.configuration.properties.instance.DefaultProperty.DEFAULT_S3A_READAHEAD_RANGE;
import static sleeper.configuration.properties.instance.DefaultProperty.DEFAULT_STATISTICS_TRUNCATE_LENGTH;
import static sleeper.configuration.properties.instance.DefaultProperty.DEFAULT_TRANSACTION_LOG_LOAD_LATEST_SNAPSHOTS;
import static sleeper.configuration.properties.instance.GarbageCollectionProperty.DEFAULT_GARBAGE_COLLECTOR_DELAY_BEFORE_DELETION;
import static sleeper.configuration.properties.instance.NonPersistentEMRProperty.DEFAULT_BULK_IMPORT_EMR_EXECUTOR_ARM_INSTANCE_TYPES;
import static sleeper.configuration.properties.instance.NonPersistentEMRProperty.DEFAULT_BULK_IMPORT_EMR_EXECUTOR_MARKET_TYPE;
Expand Down Expand Up @@ -241,6 +242,11 @@ public interface TableProperty extends SleeperProperty {
"are strongly consistent.")
.propertyGroup(TablePropertyGroup.METADATA)
.build();
TableProperty TRANSACTION_LOG_LOAD_LATEST_SNAPSHOTS = Index.propertyBuilder("sleeper.table.metadata.transactionlog.load.latest.snapshots")
.defaultProperty(DEFAULT_TRANSACTION_LOG_LOAD_LATEST_SNAPSHOTS)
.description("If set, the transaction log state store will load the latest snapshot from the snapshot store when created.")
.propertyGroup(TablePropertyGroup.METADATA)
.build();
TableProperty BULK_IMPORT_EMR_INSTANCE_ARCHITECTURE = Index.propertyBuilder("sleeper.table.bulk.import.emr.instance.architecture")
.defaultProperty(DEFAULT_BULK_IMPORT_EMR_INSTANCE_ARCHITECTURE)
.description("(Non-persistent EMR mode only) Which architecture to be used for EC2 instance types " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.CONFIG_BUCKET;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.DATA_BUCKET;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.FILE_REFERENCE_COUNT_TABLENAME;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.FILE_TRANSACTION_LOG_TABLENAME;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.PARTITION_TABLENAME;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.PARTITION_TRANSACTION_LOG_TABLENAME;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.QUERY_TRACKER_TABLE_NAME;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.REVISION_TABLENAME;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.TABLE_ID_INDEX_DYNAMO_TABLENAME;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.TABLE_NAME_INDEX_DYNAMO_TABLENAME;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.TABLE_ONLINE_INDEX_DYNAMO_TABLENAME;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.TRANSACTION_LOG_ALL_SNAPSHOTS_TABLENAME;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.TRANSACTION_LOG_FILES_TABLENAME;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.TRANSACTION_LOG_LATEST_SNAPSHOTS_TABLENAME;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.TRANSACTION_LOG_PARTITIONS_TABLENAME;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.VERSION;
import static sleeper.configuration.properties.instance.CommonProperty.ACCOUNT;
import static sleeper.configuration.properties.instance.CommonProperty.ID;
Expand Down Expand Up @@ -89,8 +89,8 @@ public static InstanceProperties createTestInstanceProperties() {
instanceProperties.set(FILE_REFERENCE_COUNT_TABLENAME, id + "-frc");
instanceProperties.set(PARTITION_TABLENAME, id + "-p");
instanceProperties.set(REVISION_TABLENAME, id + "-rv");
instanceProperties.set(FILE_TRANSACTION_LOG_TABLENAME, id + "-ftl");
instanceProperties.set(PARTITION_TRANSACTION_LOG_TABLENAME, id + "-ptl");
instanceProperties.set(TRANSACTION_LOG_FILES_TABLENAME, id + "-ftl");
instanceProperties.set(TRANSACTION_LOG_PARTITIONS_TABLENAME, id + "-ptl");
instanceProperties.set(TRANSACTION_LOG_ALL_SNAPSHOTS_TABLENAME, id + "-tlas");
instanceProperties.set(TRANSACTION_LOG_LATEST_SNAPSHOTS_TABLENAME, id + "-tlls");
instanceProperties.set(TABLE_NAME_INDEX_DYNAMO_TABLENAME, id + "-tni");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public StateStore getStateStore(TableProperties tableProperties) {
return new S3StateStore(instanceProperties, tableProperties, dynamoDB, configuration);
}
if (stateStoreClassName.equals(DynamoDBTransactionLogStateStore.class.getName())) {
return new DynamoDBTransactionLogStateStore(instanceProperties, tableProperties, dynamoDB, s3);
return new DynamoDBTransactionLogStateStore(instanceProperties, tableProperties, dynamoDB, s3, configuration);
}
throw new RuntimeException("Unknown StateStore class: " + stateStoreClassName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,97 +32,81 @@
import sleeper.core.statestore.AllReferencesToAFile;
import sleeper.core.statestore.FileReference;
import sleeper.core.statestore.FileReferenceSerDe;
import sleeper.core.statestore.StateStoreException;
import sleeper.core.statestore.transactionlog.StateStoreFiles;
import sleeper.core.statestore.transactionlog.StateStorePartitions;
import sleeper.io.parquet.record.ParquetReaderIterator;
import sleeper.io.parquet.record.ParquetRecordReader;
import sleeper.io.parquet.record.ParquetRecordWriterFactory;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Stream;

public class StateStoreFileUtils {
private static final Schema FILES_SCHEMA = initialiseFilesSchema();
private static final Schema PARTITIONS_SCHEMA = initialisePartitionSchema();
private final FileReferenceSerDe serDe = new FileReferenceSerDe();
private final Configuration configuration;
private final Schema schema;

public static StateStoreFileUtils forFiles(Configuration configuration) {
return new StateStoreFileUtils(initialiseFilesSchema(), configuration);
}

public static StateStoreFileUtils forPartitions(Configuration configuration) {
return new StateStoreFileUtils(initialisePartitionSchema(), configuration);
}

private StateStoreFileUtils(Schema schema, Configuration configuration) {
this.schema = schema;
public StateStoreFileUtils(Configuration configuration) {
this.configuration = configuration;
}

public void savePartitions(String path, StateStorePartitions partitions, Schema sleeperSchema) throws StateStoreException {
public void savePartitions(String path, StateStorePartitions partitions, Schema sleeperSchema) throws IOException {
savePartitions(path, partitions.all(), sleeperSchema);
}

public void savePartitions(String path, Collection<Partition> partitions, Schema sleeperSchema) throws StateStoreException {
public void savePartitions(String path, Collection<Partition> partitions, Schema sleeperSchema) throws IOException {
RegionSerDe regionSerDe = new RegionSerDe(sleeperSchema);
save(path, partitions.stream().map(partition -> getRecordFromPartition(partition, regionSerDe)));
save(PARTITIONS_SCHEMA, path, partitions.stream().map(partition -> getRecordFromPartition(partition, regionSerDe)));
}

public void saveFiles(String path, StateStoreFiles files) throws StateStoreException {
public void saveFiles(String path, StateStoreFiles files) throws IOException {
saveFiles(path, files.referencedAndUnreferenced());
}

public void saveFiles(String path, Stream<AllReferencesToAFile> files) throws StateStoreException {
save(path, files.map(this::getRecordFromFile));
public void saveFiles(String path, Stream<AllReferencesToAFile> files) throws IOException {
save(FILES_SCHEMA, path, files.map(this::getRecordFromFile));
}

private void save(String path, Stream<Record> records) throws StateStoreException {
private void save(Schema schema, String path, Stream<Record> records) throws IOException {
try (ParquetWriter<Record> recordWriter = ParquetRecordWriterFactory.createParquetRecordWriter(
new Path(path), schema, configuration)) {
for (Record record : (Iterable<Record>) () -> records.iterator()) {
recordWriter.write(record);
}
} catch (IOException e) {
throw new UncheckedIOException("Failed writing records", e);
}
}

public void loadPartitions(String path, Schema sleeperSchema, Consumer<Partition> partitionConsumer) throws StateStoreException {
public void loadPartitions(String path, Schema sleeperSchema, Consumer<Partition> partitionConsumer) throws IOException {
RegionSerDe regionSerDe = new RegionSerDe(sleeperSchema);
load(path, record -> partitionConsumer.accept(getPartitionFromRecord(record, regionSerDe)));
load(PARTITIONS_SCHEMA, path, record -> partitionConsumer.accept(getPartitionFromRecord(record, regionSerDe)));
}

public void loadFiles(String path, Consumer<AllReferencesToAFile> fileConsumer) throws StateStoreException {
load(path, record -> fileConsumer.accept(getFileFromRecord(record)));
public void loadFiles(String path, Consumer<AllReferencesToAFile> fileConsumer) throws IOException {
load(FILES_SCHEMA, path, record -> fileConsumer.accept(getFileFromRecord(record)));
}

private void load(String path, Consumer<Record> recordConsumer) throws StateStoreException {
private void load(Schema schema, String path, Consumer<Record> recordConsumer) throws IOException {
try (ParquetReader<Record> reader = new ParquetRecordReader.Builder(new Path(path), schema)
.withConf(configuration).build();
ParquetReaderIterator recordReader = new ParquetReaderIterator(reader)) {
while (recordReader.hasNext()) {
recordConsumer.accept(recordReader.next());
}
} catch (IOException e) {
throw new StateStoreException("Failed reading records", e);
}
}

public boolean isEmpty(String path) throws StateStoreException {
try (ParquetReader<Record> reader = new ParquetRecordReader.Builder(new Path(path), schema)
public boolean isEmpty(String path) throws IOException {
try (ParquetReader<Record> reader = new ParquetRecordReader.Builder(new Path(path), FILES_SCHEMA)
.withConf(configuration).build();
ParquetReaderIterator recordReader = new ParquetReaderIterator(reader)) {
if (recordReader.hasNext()) {
return false;
}
} catch (IOException e) {
throw new StateStoreException("Failed reading records", e);
}
return true;
}
Expand Down
Loading
Loading