Skip to content

Commit

Permalink
Add Delta table read version to connectorInfo
Browse files Browse the repository at this point in the history
  • Loading branch information
jkylling authored and ebyhr committed Dec 29, 2023
1 parent f124589 commit a9d1e03
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
public class DeltaLakeInputInfo
{
private final boolean partitioned;
private final long version;

@JsonCreator
public DeltaLakeInputInfo(@JsonProperty("partitioned") boolean partitioned)
public DeltaLakeInputInfo(@JsonProperty("partitioned") boolean partitioned, @JsonProperty("version") long version)
{
this.partitioned = partitioned;
this.version = version;
}

@JsonProperty
Expand All @@ -34,6 +36,12 @@ public boolean isPartitioned()
return partitioned;
}

@JsonProperty
public long getVersion()
{
return version;
}

@Override
public boolean equals(Object o)
{
Expand All @@ -43,12 +51,12 @@ public boolean equals(Object o)
if (!(o instanceof DeltaLakeInputInfo that)) {
return false;
}
return partitioned == that.partitioned;
return partitioned == that.partitioned && version == that.version;
}

@Override
public int hashCode()
{
return Objects.hash(partitioned);
return Objects.hash(partitioned, version);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2362,8 +2362,9 @@ private void cleanupFailedWrite(ConnectorSession session, String tableLocation,
@Override
public Optional<Object> getInfo(ConnectorTableHandle table)
{
boolean isPartitioned = !((DeltaLakeTableHandle) table).getMetadataEntry().getLowercasePartitionColumns().isEmpty();
return Optional.of(new DeltaLakeInputInfo(isPartitioned));
DeltaLakeTableHandle handle = (DeltaLakeTableHandle) table;
boolean isPartitioned = !handle.getMetadataEntry().getLowercasePartitionColumns().isEmpty();
return Optional.of(new DeltaLakeInputInfo(isPartitioned, handle.getReadVersion()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ public void testGetInputInfoForPartitionedTable()
ImmutableList.of(BIGINT_COLUMN_1));
deltaLakeMetadata.createTable(SESSION, tableMetadata, false);
DeltaLakeTableHandle tableHandle = (DeltaLakeTableHandle) deltaLakeMetadata.getTableHandle(SESSION, tableMetadata.getTable());
assertThat(deltaLakeMetadata.getInfo(tableHandle)).isEqualTo(Optional.of(new DeltaLakeInputInfo(true)));
assertThat(deltaLakeMetadata.getInfo(tableHandle)).isEqualTo(Optional.of(new DeltaLakeInputInfo(true, 0)));
deltaLakeMetadata.cleanupQuery(SESSION);
}

Expand All @@ -488,7 +488,7 @@ public void testGetInputInfoForUnPartitionedTable()
ImmutableList.of());
deltaLakeMetadata.createTable(SESSION, tableMetadata, false);
DeltaLakeTableHandle tableHandle = (DeltaLakeTableHandle) deltaLakeMetadata.getTableHandle(SESSION, tableMetadata.getTable());
assertThat(deltaLakeMetadata.getInfo(tableHandle)).isEqualTo(Optional.of(new DeltaLakeInputInfo(false)));
assertThat(deltaLakeMetadata.getInfo(tableHandle)).isEqualTo(Optional.of(new DeltaLakeInputInfo(false, 0)));
deltaLakeMetadata.cleanupQuery(SESSION);
}

Expand Down

0 comments on commit a9d1e03

Please sign in to comment.