Skip to content

Commit

Permalink
Add builder to MetadataEntry in Delta Lake
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed May 1, 2024
1 parent 2a0b435 commit d9ece50
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry.Format;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
import io.trino.plugin.deltalake.transactionlog.RemoveFileEntry;
import io.trino.plugin.deltalake.transactionlog.TableSnapshot;
Expand Down Expand Up @@ -1049,14 +1048,14 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
appendTableEntries(
commitVersion,
transactionLogWriter,
randomUUID().toString(),
serializeSchemaAsJson(deltaTable.build()),
getPartitionedBy(tableMetadata.getProperties()),
configurationForNewTable(checkpointInterval, changeDataFeedEnabled, columnMappingMode, maxFieldId),
saveMode == SaveMode.REPLACE ? CREATE_OR_REPLACE_TABLE_OPERATION : CREATE_TABLE_OPERATION,
session,
tableMetadata.getComment(),
protocolEntry);
protocolEntry,
MetadataEntry.builder()
.setDescription(tableMetadata.getComment())
.setSchemaString(serializeSchemaAsJson(deltaTable.build()))
.setPartitionColumns(getPartitionedBy(tableMetadata.getProperties()))
.setConfiguration(configurationForNewTable(checkpointInterval, changeDataFeedEnabled, columnMappingMode, maxFieldId)));

transactionLogWriter.flush();
}
Expand Down Expand Up @@ -1397,14 +1396,14 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(
appendTableEntries(
commitVersion,
transactionLogWriter,
randomUUID().toString(),
schemaString,
handle.partitionedBy(),
configurationForNewTable(handle.checkpointInterval(), handle.changeDataFeedEnabled(), columnMappingMode, handle.maxColumnId()),
handle.replace() ? CREATE_OR_REPLACE_TABLE_AS_OPERATION : CREATE_TABLE_AS_OPERATION,
session,
handle.comment(),
handle.protocolEntry());
handle.protocolEntry(),
MetadataEntry.builder()
.setDescription(handle.comment())
.setSchemaString(schemaString)
.setPartitionColumns(handle.partitionedBy())
.setConfiguration(configurationForNewTable(handle.checkpointInterval(), handle.changeDataFeedEnabled(), columnMappingMode, handle.maxColumnId())));
appendAddFileEntries(transactionLogWriter, dataFileInfos, physicalPartitionNames, columnNames, true);
if (handle.readVersion().isPresent()) {
long writeTimestamp = Instant.now().toEpochMilli();
Expand Down Expand Up @@ -1500,14 +1499,11 @@ public void setTableComment(ConnectorSession session, ConnectorTableHandle table
appendTableEntries(
commitVersion,
transactionLogWriter,
handle.getMetadataEntry().getId(),
handle.getMetadataEntry().getSchemaString(),
handle.getMetadataEntry().getOriginalPartitionColumns(),
handle.getMetadataEntry().getConfiguration(),
SET_TBLPROPERTIES_OPERATION,
session,
comment,
protocolEntry);
protocolEntry,
MetadataEntry.builder(handle.getMetadataEntry())
.setDescription(comment));
transactionLogWriter.flush();
}
catch (Exception e) {
Expand Down Expand Up @@ -1540,14 +1536,11 @@ public void setColumnComment(ConnectorSession session, ConnectorTableHandle tabl
appendTableEntries(
commitVersion,
transactionLogWriter,
deltaLakeTableHandle.getMetadataEntry().getId(),
serializeSchemaAsJson(deltaTable),
deltaLakeTableHandle.getMetadataEntry().getOriginalPartitionColumns(),
deltaLakeTableHandle.getMetadataEntry().getConfiguration(),
CHANGE_COLUMN_OPERATION,
session,
Optional.ofNullable(deltaLakeTableHandle.getMetadataEntry().getDescription()),
protocolEntry);
protocolEntry,
MetadataEntry.builder(deltaLakeTableHandle.getMetadataEntry())
.setSchemaString(serializeSchemaAsJson(deltaTable)));
transactionLogWriter.flush();
}
catch (Exception e) {
Expand Down Expand Up @@ -1629,14 +1622,12 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle
appendTableEntries(
commitVersion,
transactionLogWriter,
handle.getMetadataEntry().getId(),
serializeSchemaAsJson(deltaTable),
handle.getMetadataEntry().getOriginalPartitionColumns(),
configuration,
ADD_COLUMN_OPERATION,
session,
Optional.ofNullable(handle.getMetadataEntry().getDescription()),
buildProtocolEntryForNewColumn(protocolEntry, newColumnMetadata.getType()));
buildProtocolEntryForNewColumn(protocolEntry, newColumnMetadata.getType()),
MetadataEntry.builder(handle.getMetadataEntry())
.setSchemaString(serializeSchemaAsJson(deltaTable))
.setConfiguration(configuration));
transactionLogWriter.flush();
}
catch (Exception e) {
Expand Down Expand Up @@ -1715,14 +1706,11 @@ public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandl
appendTableEntries(
commitVersion,
transactionLogWriter,
metadataEntry.getId(),
serializeSchemaAsJson(deltaTable),
metadataEntry.getOriginalPartitionColumns(),
metadataEntry.getConfiguration(),
DROP_COLUMN_OPERATION,
session,
Optional.ofNullable(metadataEntry.getDescription()),
protocolEntry);
protocolEntry,
MetadataEntry.builder(metadataEntry)
.setSchemaString(serializeSchemaAsJson(deltaTable)));
transactionLogWriter.flush();
}
catch (Exception e) {
Expand Down Expand Up @@ -1783,14 +1771,12 @@ public void renameColumn(ConnectorSession session, ConnectorTableHandle tableHan
appendTableEntries(
commitVersion,
transactionLogWriter,
metadataEntry.getId(),
serializeSchemaAsJson(deltaTable),
partitionColumns,
metadataEntry.getConfiguration(),
RENAME_COLUMN_OPERATION,
session,
Optional.ofNullable(metadataEntry.getDescription()),
protocolEntry);
protocolEntry,
MetadataEntry.builder(metadataEntry)
.setSchemaString(serializeSchemaAsJson(deltaTable))
.setPartitionColumns(partitionColumns));
transactionLogWriter.flush();
// Don't update extended statistics because it uses physical column names internally
}
Expand Down Expand Up @@ -1820,14 +1806,11 @@ public void dropNotNullConstraint(ConnectorSession session, ConnectorTableHandle
appendTableEntries(
table.getReadVersion() + 1,
transactionLogWriter,
metadataEntry.getId(),
serializeSchemaAsJson(deltaTable),
metadataEntry.getOriginalPartitionColumns(),
metadataEntry.getConfiguration(),
CHANGE_COLUMN_OPERATION,
session,
Optional.ofNullable(metadataEntry.getDescription()),
protocolEntry);
protocolEntry,
MetadataEntry.builder(metadataEntry)
.setSchemaString(serializeSchemaAsJson(deltaTable)));
transactionLogWriter.flush();
}
catch (Exception e) {
Expand All @@ -1838,30 +1821,16 @@ public void dropNotNullConstraint(ConnectorSession session, ConnectorTableHandle
private void appendTableEntries(
long commitVersion,
TransactionLogWriter transactionLogWriter,
String tableId,
String schemaString,
List<String> partitionColumnNames,
Map<String, String> configuration,
String operation,
ConnectorSession session,
Optional<String> comment,
ProtocolEntry protocolEntry)
ProtocolEntry protocolEntry,
MetadataEntry.Builder metadataEntry)
{
long createdTime = System.currentTimeMillis();
transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(session, IsolationLevel.WRITESERIALIZABLE, commitVersion, createdTime, operation, 0, true));

transactionLogWriter.appendProtocolEntry(protocolEntry);

transactionLogWriter.appendMetadataEntry(
new MetadataEntry(
tableId,
null,
comment.orElse(null),
new Format("parquet", ImmutableMap.of()),
schemaString,
partitionColumnNames,
ImmutableMap.copyOf(configuration),
createdTime));
transactionLogWriter.appendMetadataEntry(metadataEntry.setCreatedTime(createdTime).build());
}

private static void appendAddFileEntries(TransactionLogWriter transactionLogWriter, List<DataFileInfo> dataFileInfos, List<String> partitionColumnNames, List<String> originalColumnNames, boolean dataChange)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode;
import io.trino.spi.TrinoException;
Expand All @@ -25,6 +26,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.UUID;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA;
Expand All @@ -33,6 +35,7 @@
import static java.lang.Long.parseLong;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;

public class MetadataEntry
{
Expand Down Expand Up @@ -215,5 +218,83 @@ public String toString()
id, name, description, format, schemaString, partitionColumns, configuration, createdTime);
}

public static Builder builder()
{
return new Builder();
}

public static Builder builder(MetadataEntry metadataEntry)
{
return new Builder(metadataEntry);
}

public static class Builder
{
private String id = UUID.randomUUID().toString();
private String name;
private Optional<String> description = Optional.empty();
private Format format = new Format("parquet", ImmutableMap.of());
private String schemaString;
private List<String> partitionColumns = ImmutableList.of();
private Map<String, String> configuration;
private long createdTime;

private Builder() {}

private Builder(MetadataEntry metadataEntry)
{
requireNonNull(metadataEntry, "metadataEntry is null");
id = metadataEntry.id;
name = metadataEntry.name;
description = Optional.ofNullable(metadataEntry.description);
format = metadataEntry.format;
schemaString = metadataEntry.schemaString;
partitionColumns = ImmutableList.copyOf(metadataEntry.partitionColumns);
configuration = ImmutableMap.copyOf(metadataEntry.configuration);
createdTime = metadataEntry.createdTime;
}

public Builder setId(String id)
{
this.id = id;
return this;
}

public Builder setDescription(Optional<String> description)
{
this.description = description;
return this;
}

public Builder setSchemaString(String schemaString)
{
this.schemaString = schemaString;
return this;
}

public Builder setPartitionColumns(List<String> partitionColumns)
{
this.partitionColumns = ImmutableList.copyOf(partitionColumns);
return this;
}

public Builder setConfiguration(Map<String, String> configuration)
{
this.configuration = ImmutableMap.copyOf(configuration);
return this;
}

public Builder setCreatedTime(long createdTime)
{
this.createdTime = createdTime;
return this;
}

public MetadataEntry build()
{
return new MetadataEntry(id, name, description.orElse(null), format, schemaString, partitionColumns, configuration, createdTime);
}
}

public record Format(String provider, Map<String, String> options) {}
}

0 comments on commit d9ece50

Please sign in to comment.