Skip to content

Commit

Permalink
Support time travel using temporal version in DeltaLake
Browse files Browse the repository at this point in the history
  • Loading branch information
chenjian2664 committed Jan 28, 2025
1 parent 76d1e6f commit 2c9389f
Show file tree
Hide file tree
Showing 9 changed files with 980 additions and 5 deletions.
36 changes: 36 additions & 0 deletions docs/src/main/sphinx/connector/delta-lake.md
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,42 @@ SELECT *
FROM example.testdb.customer_orders FOR VERSION AS OF 3
```

A different approach of retrieving historical data is to specify a point in time
in the past, such as a day or week ago. The latest snapshot of the table taken
before or at the specified timestamp in the query is internally used for
providing the previous state of the table:

```sql
SELECT *
FROM example.testdb.customer_orders FOR TIMESTAMP AS OF TIMESTAMP '2022-03-23 09:59:29.803 Europe/Vienna';
```

The connector allows to create a new snapshot through DeltaLake's [replace table](delta-lake-create-or-replace).

```sql
CREATE OR REPLACE TABLE example.testdb.customer_orders AS
SELECT *
FROM example.testdb.customer_orders FOR TIMESTAMP AS OF TIMESTAMP '2022-03-23 09:59:29.803 Europe/Vienna';
```

You can use a date to specify a point a time in the past for using a snapshot of a table in a query.
Assuming that the session time zone is `Europe/Vienna` the following queries are equivalent:

```sql
SELECT *
FROM example.testdb.customer_orders FOR TIMESTAMP AS OF DATE '2022-03-23';
```

```sql
SELECT *
FROM example.testdb.customer_orders FOR TIMESTAMP AS OF TIMESTAMP '2022-03-23 00:00:00';
```

```sql
SELECT *
FROM example.testdb.customer_orders FOR TIMESTAMP AS OF TIMESTAMP '2022-03-23 00:00:00.000 Europe/Vienna';
```

Use the `$history` metadata table to determine the snapshot ID of the
table like in the following query:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
import io.trino.plugin.deltalake.transactionlog.RemoveFileEntry;
import io.trino.plugin.deltalake.transactionlog.TableSnapshot;
import io.trino.plugin.deltalake.transactionlog.TemporalTimeTravelUtil;
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
import io.trino.plugin.deltalake.transactionlog.TransactionLogEntries;
import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointWriterManager;
Expand Down Expand Up @@ -146,6 +147,8 @@
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.FixedWidthType;
import io.trino.spi.type.HyperLogLogType;
import io.trino.spi.type.LongTimestamp;
import io.trino.spi.type.LongTimestampWithTimeZone;
import io.trino.spi.type.MapType;
import io.trino.spi.type.RowType;
import io.trino.spi.type.TimestampType;
Expand All @@ -160,6 +163,9 @@
import java.net.URISyntaxException;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -335,10 +341,12 @@
import static io.trino.spi.type.RealType.REAL;
import static io.trino.spi.type.SmallintType.SMALLINT;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_MILLISECOND;
import static io.trino.spi.type.TinyintType.TINYINT;
import static io.trino.spi.type.TypeUtils.isFloatingPointNaN;
import static io.trino.spi.type.VarbinaryType.VARBINARY;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.lang.Math.floorDiv;
import static java.lang.String.format;
import static java.time.Instant.EPOCH;
import static java.util.Collections.singletonList;
Expand Down Expand Up @@ -525,15 +533,57 @@ public TableSnapshot getSnapshot(ConnectorSession session, SchemaTableName table
}
}

private static long getVersion(TrinoFileSystem fileSystem, String tableLocation, ConnectorTableVersion version)
private static long getVersion(ConnectorSession session, TrinoFileSystem fileSystem, String tableLocation, ConnectorTableVersion version)
{
return switch (version.getPointerType()) {
// TODO https://github.com/trinodb/trino/issues/21024 Add support for reading tables with temporal versions
case TEMPORAL -> throw new TrinoException(NOT_SUPPORTED, "This connector does not support reading tables with TIMESTAMP AS OF");
case TEMPORAL -> getTemporalSnapshotIdFromVersion(session, fileSystem, tableLocation, version);
case TARGET_ID -> getTargetVersion(fileSystem, tableLocation, version);
};
}

private static long getTemporalSnapshotIdFromVersion(ConnectorSession session, TrinoFileSystem fileSystem, String tableLocation, ConnectorTableVersion version)
{
Type versionType = version.getVersionType();
if (versionType.equals(DATE)) {
// Retrieve the latest snapshot made before or at the beginning of the day of the specified date in the session's time zone
long epochMillis = LocalDate.ofEpochDay((Long) version.getVersion())
.atStartOfDay()
.atZone(session.getTimeZoneKey().getZoneId())
.toInstant()
.toEpochMilli();
return getSnapshotIdAsOfTime(fileSystem, tableLocation, epochMillis);
}
if (versionType instanceof TimestampType timestampVersionType) {
long epochMicrosUtc = timestampVersionType.isShort()
? (long) version.getVersion()
: ((LongTimestamp) version.getVersion()).getEpochMicros();
long epochMillisUtc = floorDiv(epochMicrosUtc, MICROSECONDS_PER_MILLISECOND);
long epochMillis = LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillisUtc), ZoneOffset.UTC)
.atZone(session.getTimeZoneKey().getZoneId())
.toInstant()
.toEpochMilli();
return getSnapshotIdAsOfTime(fileSystem, tableLocation, epochMillis);
}
if (versionType instanceof TimestampWithTimeZoneType timeZonedVersionType) {
long epochMillis = timeZonedVersionType.isShort()
? unpackMillisUtc((long) version.getVersion())
: ((LongTimestampWithTimeZone) version.getVersion()).getEpochMillis();
return getSnapshotIdAsOfTime(fileSystem, tableLocation, epochMillis);
}

throw new TrinoException(NOT_SUPPORTED, "Unsupported type for temporal table version: " + versionType.getDisplayName());
}

private static long getSnapshotIdAsOfTime(TrinoFileSystem fileSystem, String tableLocation, long epochMillis)
{
try {
return TemporalTimeTravelUtil.findLatestVersionUsingTemporal(fileSystem, tableLocation, epochMillis);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}

private static long getTargetVersion(TrinoFileSystem fileSystem, String tableLocation, ConnectorTableVersion version)
{
long snapshotId;
Expand Down Expand Up @@ -624,7 +674,7 @@ public LocatedTableHandle getTableHandle(

String tableLocation = table.location();
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
TableSnapshot tableSnapshot = getSnapshot(session, tableName, tableLocation, endVersion.map(version -> getVersion(fileSystem, tableLocation, version)));
TableSnapshot tableSnapshot = getSnapshot(session, tableName, tableLocation, endVersion.map(version -> getVersion(session, fileSystem, tableLocation, version)));

MetadataAndProtocolEntries logEntries;
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake.transactionlog;

import io.airlift.units.DataSize;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.plugin.deltalake.transactionlog.checkpoint.LastCheckpoint;
import io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail;
import io.trino.spi.TrinoException;

import java.io.IOException;
import java.time.Instant;
import java.util.Objects;
import java.util.Optional;

import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.readLastCheckpoint;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogJsonEntryPath;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.getEntriesFromJson;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.loadNewTail;
import static io.trino.spi.StandardErrorCode.INVALID_ARGUMENTS;
import static java.lang.String.format;

public final class TemporalTimeTravelUtil
{
private TemporalTimeTravelUtil() {}

public static long findLatestVersionUsingTemporal(TrinoFileSystem fileSystem, String tableLocation, long epochMillis)
throws IOException
{
Optional<LastCheckpoint> lastCheckpoint = readLastCheckpoint(fileSystem, tableLocation);
long entryNumber = lastCheckpoint.map(LastCheckpoint::version).orElse(0L);

String transactionLogDir = getTransactionLogDir(tableLocation);

Optional<TransactionLogEntries> checkpointEntries = getEntriesFromJson(entryNumber, transactionLogDir, fileSystem, DataSize.ofBytes(0));
if (checkpointEntries.isEmpty()) {
throw new MissingTransactionLogException(getTransactionLogJsonEntryPath(transactionLogDir, entryNumber).toString());
}

Optional<CommitInfoEntry> commitInfo = checkpointEntries.get().getEntries(fileSystem)
.map(DeltaLakeTransactionLogEntry::getCommitInfo)
.filter(Objects::nonNull)
.findFirst();

// We can not derive useful info from the last check point
if (commitInfo.isEmpty()) {
return findLatestVersionFromWholeTransactions(fileSystem, tableLocation, epochMillis);
}

if (commitInfo.get().timestamp() == epochMillis) {
return entryNumber;
}

if (commitInfo.get().timestamp() < epochMillis) {
long tail = searchTowardsTail(fileSystem, entryNumber + 1, transactionLogDir, epochMillis);
if (tail >= 0) {
return tail;
}
return entryNumber;
}
else {
long head = searchTowardsHead(fileSystem, entryNumber - 1, transactionLogDir, epochMillis);
if (head >= 0) {
return head;
}
}

throw new TrinoException(INVALID_ARGUMENTS, format("No temporal version history at or before %s", Instant.ofEpochMilli(epochMillis)));
}

private static long searchTowardsTail(TrinoFileSystem fileSystem, long entryNumber, String transactionLogDir, long epochMillis)
throws IOException
{
long version = -1;
Optional<TransactionLogEntries> entries = getEntriesFromJson(entryNumber, transactionLogDir, fileSystem, DataSize.ofBytes(0));
while (entries.isPresent()) {
Optional<CommitInfoEntry> commitInfo = entries.get().getEntries(fileSystem)
.map(DeltaLakeTransactionLogEntry::getCommitInfo)
.filter(Objects::nonNull)
.findFirst();

if (commitInfo.isEmpty()) {
entryNumber++;
entries = getEntriesFromJson(entryNumber, transactionLogDir, fileSystem, DataSize.ofBytes(0));
continue;
}

if (commitInfo.get().timestamp() > epochMillis) {
break;
}

version = entryNumber;

entryNumber++;
entries = getEntriesFromJson(entryNumber, transactionLogDir, fileSystem, DataSize.ofBytes(0));
}

return version;
}

private static long searchTowardsHead(TrinoFileSystem fileSystem, long entryNumber, String transactionLogDir, long epochMillis)
throws IOException
{
Optional<TransactionLogEntries> entries = getEntriesFromJson(entryNumber, transactionLogDir, fileSystem, DataSize.ofBytes(0));
while (entries.isPresent()) {
Optional<CommitInfoEntry> commitInfo = entries.get().getEntries(fileSystem)
.map(DeltaLakeTransactionLogEntry::getCommitInfo)
.filter(Objects::nonNull)
.findFirst();

if (commitInfo.isEmpty()) {
entryNumber--;
if (entryNumber < 0) {
break;
}
entries = getEntriesFromJson(entryNumber, transactionLogDir, fileSystem, DataSize.ofBytes(0));
continue;
}

if (commitInfo.get().timestamp() <= epochMillis) {
return commitInfo.get().version();
}

entryNumber--;
if (entryNumber < 0) {
break;
}
entries = getEntriesFromJson(entryNumber, transactionLogDir, fileSystem, DataSize.ofBytes(0));
}

return -1;
}

private static long findLatestVersionFromWholeTransactions(TrinoFileSystem fileSystem, String tableLocation, long epochMillis)
throws IOException
{
TransactionLogTail transactionLogTail = loadNewTail(fileSystem, tableLocation, Optional.empty(), Optional.empty(), DataSize.ofBytes(0));

long version = -1;
for (Transaction transaction : transactionLogTail.getTransactions()) {
Optional<CommitInfoEntry> commitInfo = transaction.transactionEntries().getEntries(fileSystem)
.map(DeltaLakeTransactionLogEntry::getCommitInfo)
.filter(Objects::nonNull)
.filter(commitInfoEntry -> commitInfoEntry.timestamp() <= epochMillis)
.findFirst();
if (commitInfo.isEmpty()) {
continue;
}
version = Math.max(version, commitInfo.get().version());
}

if (version >= 0) {
return version;
}

throw new TrinoException(INVALID_ARGUMENTS, format("No temporal version history at or before %s", Instant.ofEpochMilli(epochMillis)));
}
}
Loading

0 comments on commit 2c9389f

Please sign in to comment.