Skip to content

Commit

Permalink
Support time travel using temporal version in DeltaLake
Browse files Browse the repository at this point in the history
  • Loading branch information
chenjian2664 committed Jan 31, 2025
1 parent 76d1e6f commit 83c69f0
Show file tree
Hide file tree
Showing 33 changed files with 1,154 additions and 6 deletions.
36 changes: 36 additions & 0 deletions docs/src/main/sphinx/connector/delta-lake.md
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,42 @@ SELECT *
FROM example.testdb.customer_orders FOR VERSION AS OF 3
```

A different approach of retrieving historical data is to specify a point in time
in the past, such as a day or week ago. The latest snapshot of the table taken
before or at the specified timestamp in the query is internally used for
providing the previous state of the table:

```sql
SELECT *
FROM example.testdb.customer_orders FOR TIMESTAMP AS OF TIMESTAMP '2022-03-23 09:59:29.803 America/Los_Angeles';
```

The connector allows to create a new snapshot through Delta Lake's [replace table](delta-lake-create-or-replace).

```sql
CREATE OR REPLACE TABLE example.testdb.customer_orders AS
SELECT *
FROM example.testdb.customer_orders FOR TIMESTAMP AS OF TIMESTAMP '2022-03-23 09:59:29.803 America/Los_Angeles';
```

You can use a date to specify a point a time in the past for using a snapshot of a table in a query.
Assuming that the session time zone is `America/Los_Angeles` the following queries are equivalent:

```sql
SELECT *
FROM example.testdb.customer_orders FOR TIMESTAMP AS OF DATE '2022-03-23';
```

```sql
SELECT *
FROM example.testdb.customer_orders FOR TIMESTAMP AS OF TIMESTAMP '2022-03-23 00:00:00';
```

```sql
SELECT *
FROM example.testdb.customer_orders FOR TIMESTAMP AS OF TIMESTAMP '2022-03-23 00:00:00.000 America/Los_Angeles';
```

Use the `$history` metadata table to determine the snapshot ID of the
table like in the following query:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.FixedWidthType;
import io.trino.spi.type.HyperLogLogType;
import io.trino.spi.type.LongTimestamp;
import io.trino.spi.type.LongTimestampWithTimeZone;
import io.trino.spi.type.MapType;
import io.trino.spi.type.RowType;
import io.trino.spi.type.TimestampType;
Expand All @@ -160,6 +162,8 @@
import java.net.URISyntaxException;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -288,6 +292,7 @@
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeTableFeatures.unsupportedWriterFeatures;
import static io.trino.plugin.deltalake.transactionlog.MetadataEntry.DELTA_CHANGE_DATA_FEED_ENABLED_PROPERTY;
import static io.trino.plugin.deltalake.transactionlog.MetadataEntry.configurationForNewTable;
import static io.trino.plugin.deltalake.transactionlog.TemporalTimeTravelUtil.findLatestVersionUsingTemporal;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.getMandatoryCurrentVersion;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogJsonEntryPath;
Expand Down Expand Up @@ -335,10 +340,12 @@
import static io.trino.spi.type.RealType.REAL;
import static io.trino.spi.type.SmallintType.SMALLINT;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_MILLISECOND;
import static io.trino.spi.type.TinyintType.TINYINT;
import static io.trino.spi.type.TypeUtils.isFloatingPointNaN;
import static io.trino.spi.type.VarbinaryType.VARBINARY;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.lang.Math.floorDiv;
import static java.lang.String.format;
import static java.time.Instant.EPOCH;
import static java.util.Collections.singletonList;
Expand Down Expand Up @@ -525,15 +532,60 @@ public TableSnapshot getSnapshot(ConnectorSession session, SchemaTableName table
}
}

private static long getVersion(TrinoFileSystem fileSystem, String tableLocation, ConnectorTableVersion version)
private static long getVersion(ConnectorSession session, TrinoFileSystem fileSystem, String tableLocation, ConnectorTableVersion version, Executor executor)
{
return switch (version.getPointerType()) {
// TODO https://github.com/trinodb/trino/issues/21024 Add support for reading tables with temporal versions
case TEMPORAL -> throw new TrinoException(NOT_SUPPORTED, "This connector does not support reading tables with TIMESTAMP AS OF");
case TEMPORAL -> getTemporalSnapshotIdFromVersion(session, fileSystem, tableLocation, version, executor);
case TARGET_ID -> getTargetVersion(fileSystem, tableLocation, version);
};
}

private static long getTemporalSnapshotIdFromVersion(ConnectorSession session, TrinoFileSystem fileSystem, String tableLocation, ConnectorTableVersion version, Executor executor)
{
Type versionType = version.getVersionType();
if (versionType.equals(DATE)) {
// Retrieve the latest snapshot made before or at the beginning of the day of the specified date in the session's time zone
long epochMillis = LocalDate.ofEpochDay((Long) version.getVersion())
.atStartOfDay()
.atZone(session.getTimeZoneKey().getZoneId())
.toInstant()
.toEpochMilli();
return findSnapshotIdAsOfTime(fileSystem, tableLocation, epochMillis, executor);
}
if (versionType instanceof TimestampType timestampVersionType) {
long epochMicrosUtc = timestampVersionType.isShort()
? (long) version.getVersion()
: ((LongTimestamp) version.getVersion()).getEpochMicros();
long epochMillisUtc = floorDiv(epochMicrosUtc, MICROSECONDS_PER_MILLISECOND);
long epochMillis = Instant.ofEpochMilli(epochMillisUtc)
.atOffset(ZoneOffset.UTC)
.atZoneSameInstant(session.getTimeZoneKey().getZoneId())
.toInstant()
.toEpochMilli();
return findSnapshotIdAsOfTime(fileSystem, tableLocation, epochMillis, executor);
}
if (versionType instanceof TimestampWithTimeZoneType timeZonedVersionType) {
long epochMillis = timeZonedVersionType.isShort()
? unpackMillisUtc((long) version.getVersion())
: ((LongTimestampWithTimeZone) version.getVersion()).getEpochMillis();
return findSnapshotIdAsOfTime(fileSystem, tableLocation, epochMillis, executor);
}

throw new TrinoException(NOT_SUPPORTED, "Unsupported type for temporal table version: " + versionType.getDisplayName());
}

private static long findSnapshotIdAsOfTime(TrinoFileSystem fileSystem, String tableLocation, long epochMillis, Executor executor)
{
try {
return findLatestVersionUsingTemporal(fileSystem, tableLocation, epochMillis, executor);
}
catch (IOException e) {
throw new TrinoException(DELTA_LAKE_FILESYSTEM_ERROR,
format("Unexpected IO exception occurred while reading the entries under the location %s for finding latest snapshot id before or at %s",
tableLocation, Instant.ofEpochMilli(epochMillis)), e);
}
}

private static long getTargetVersion(TrinoFileSystem fileSystem, String tableLocation, ConnectorTableVersion version)
{
long snapshotId;
Expand Down Expand Up @@ -624,7 +676,7 @@ public LocatedTableHandle getTableHandle(

String tableLocation = table.location();
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
TableSnapshot tableSnapshot = getSnapshot(session, tableName, tableLocation, endVersion.map(version -> getVersion(fileSystem, tableLocation, version)));
TableSnapshot tableSnapshot = getSnapshot(session, tableName, tableLocation, endVersion.map(version -> getVersion(session, fileSystem, tableLocation, version, metadataFetchingExecutor)));

MetadataAndProtocolEntries logEntries;
try {
Expand Down Expand Up @@ -3072,6 +3124,7 @@ private CommitInfoEntry getCommitInfoEntry(
return new CommitInfoEntry(
commitVersion,
createdTime,
null,
session.getUser(),
session.getUser(),
operation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
public record CommitInfoEntry(
long version,
long timestamp,
Long inCommitTimestamp, // TODO: support in the HistoryTable
String userId,
String userName,
String operation,
Expand All @@ -48,7 +49,7 @@ public record CommitInfoEntry(

public CommitInfoEntry withVersion(long version)
{
return new CommitInfoEntry(version, timestamp, userId, userName, operation, operationParameters, job, notebook, clusterId, readVersion, isolationLevel, isBlindAppend, operationMetrics);
return new CommitInfoEntry(version, timestamp, inCommitTimestamp, userId, userName, operation, operationParameters, job, notebook, clusterId, readVersion, isolationLevel, isBlindAppend, operationMetrics);
}

public long getRetainedSizeInBytes()
Expand Down
Loading

0 comments on commit 83c69f0

Please sign in to comment.