Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] [Kernel]Make kernel lib suit the standalone. #3171

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import io.delta.kernel.data.ArrayValue;
import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.data.MapValue;
import io.delta.kernel.defaults.internal.data.vector.DefaultGenericVector;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.types.*;

Expand Down Expand Up @@ -50,6 +49,8 @@ public static StructType getSchema() {

public static Metadata getKernelMetadata() {
StructType schema = getSchema();
Map<String, String> config = new HashMap<>();
config.put("1", "2");
return new io.delta.kernel.internal.actions.Metadata(
"id",
Optional.ofNullable("name"),
Expand Down Expand Up @@ -92,24 +93,7 @@ public String getString(int rowId) {
}
},
Optional.ofNullable(1234L),
new MapValue() { // conf
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make the code cleanup here as a separate PR.

@Override
public int getSize() {
return 1;
}
@Override
public ColumnVector getKeys() {
return DefaultGenericVector.fromArray(
IntegerType.INTEGER, new Integer[]{new Integer(1)}
);
}
@Override
public ColumnVector getValues() {
return DefaultGenericVector.fromArray(
IntegerType.INTEGER, new Integer[]{new Integer(2)}
);
}
}
config
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ public enum Operation {
*/
STREAMING_UPDATE("STREAMING UPDATE"),

UPGRADE_SCHEMA("UPGRADE SCHEMA"),
COMPACT("COMPACT"),
VACUUM("UPGRADE SCHEMA"),

/**
* For any operation that doesn't fit the above categories.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.delta.kernel;

import java.util.List;
import java.util.Map;

import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.engine.Engine;
Expand Down Expand Up @@ -48,6 +49,8 @@ public interface TransactionBuilder {
*/
TransactionBuilder withPartitionColumns(Engine engine, List<String> partitionColumns);

TransactionBuilder withConfiguration(Engine engine, Map<String, String> configuration);

/**
* Set the transaction identifier for idempotent writes. Incremental processing systems (e.g.,
* streaming systems) that track progress using their own application-specific versions need to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,6 @@ CloseableIterator<ByteArrayInputStream> readFiles(
* @throws IOException for any IO error.
*/
boolean mkdirs(String path) throws IOException;

boolean delete(String path) throws IOException;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface method and its default implementation can be a separate PR.

}
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,16 @@ public static KernelException unsupportedDataType(DataType dataType) {
return new KernelException("Kernel doesn't support writing data of type: " + dataType);
}

public static KernelException unsupportedPartitionColumnsChange() {
String message = "Kernel doesn't support partition columns change.";
return new KernelException(message);
}

public static KernelException unsupportedSchemaColumnDrop() {
String message = "Kernel doesn't support schema evolution drop column.";
return new KernelException(message);
}

public static KernelException unsupportedPartitionDataType(String colName, DataType dataType) {
String msgT = "Kernel doesn't support writing data with partition column (%s) of type: %s";
return new KernelException(format(msgT, colName, dataType));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,31 @@ public class TableConfig<T> {
"needs to be a positive integer."
);

public static final TableConfig<Long> LOG_RETENTION = new TableConfig<>(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These can be added in a separate PR.

"delta.logRetentionDuration",
"interval 30 days",
IntervalParserUtils::safeParseIntervalAsMillis,
value -> true,
"needs to be provided as a calendar interval such as '2 weeks'. Months "
+ "and years are not accepted. You may specify '365 days' for a year instead."
);

public static final TableConfig<Boolean> ENABLE_EXPIRED_LOG_CLEANUP = new TableConfig<>(
"delta.enableExpiredLogCleanup",
"true",
Boolean::valueOf,
value -> true,
"needs to be a boolean."
);

public static final TableConfig<Boolean> IS_APPEND_ONLY = new TableConfig<>(
"delta.appendOnly",
"false",
Boolean::valueOf,
value -> true,
"needs to be a boolean."
);

private final String key;
private final String defaultValue;
private final Function<String, T> fromString;
Expand All @@ -79,6 +104,10 @@ private TableConfig(
this.helpMessage = helpMessage;
}

public String key() {
return key;
}

/**
* Returns the value of the table property from the given metadata.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@
package io.delta.kernel.internal;

import java.util.*;
import java.util.Map.Entry;
import static java.util.Objects.requireNonNull;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.delta.kernel.*;
import io.delta.kernel.data.ArrayValue;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.types.StructType;

import io.delta.kernel.internal.actions.*;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.replay.LogReplay;
Expand All @@ -34,12 +35,12 @@
import io.delta.kernel.internal.util.SchemaUtils;
import io.delta.kernel.internal.util.Tuple2;
import static io.delta.kernel.internal.DeltaErrors.requiresSchemaForNewTable;
import static io.delta.kernel.internal.DeltaErrors.tableAlreadyExists;
import static io.delta.kernel.internal.DeltaErrors.unsupportedPartitionColumnsChange;
import static io.delta.kernel.internal.TransactionImpl.DEFAULT_READ_VERSION;
import static io.delta.kernel.internal.TransactionImpl.DEFAULT_WRITE_VERSION;
import static io.delta.kernel.internal.util.SchemaUtils.casePreservingPartitionColNames;
import static io.delta.kernel.internal.util.VectorUtils.stringArrayValue;
import static io.delta.kernel.internal.util.VectorUtils.stringStringMapValue;


public class TransactionBuilderImpl implements TransactionBuilder {
private static final Logger logger = LoggerFactory.getLogger(TransactionBuilderImpl.class);
Expand All @@ -51,6 +52,8 @@ public class TransactionBuilderImpl implements TransactionBuilder {
private Optional<StructType> schema = Optional.empty();
private Optional<List<String>> partitionColumns = Optional.empty();
private Optional<SetTransaction> setTxnOpt = Optional.empty();
private Optional<Map<String, String>> configuration = Optional.empty();
private boolean metadataChange = false;

public TransactionBuilderImpl(TableImpl table, String engineInfo, Operation operation) {
this.table = table;
Expand All @@ -72,6 +75,14 @@ public TransactionBuilder withPartitionColumns(Engine engine, List<String> parti
return this;
}

@Override
public TransactionBuilder withConfiguration(Engine engine, Map<String, String> configuration) {
if (!configuration.isEmpty()) {
this.configuration = Optional.of(configuration);
}
return this;
}

@Override
public TransactionBuilder withTransactionId(
Engine engine,
Expand All @@ -85,11 +96,50 @@ public TransactionBuilder withTransactionId(
return this;
}

private void validateTableEvolution(SnapshotImpl snapshot) {
Metadata metadata = snapshot.getMetadata();
ArrayValue partitionColumns = metadata.getPartitionColumns();
this.partitionColumns.map(ele -> {
if (partitionColumns.getSize() != ele.size()) {
throw unsupportedPartitionColumnsChange();
}
int size = partitionColumns.getSize();
for (int i = 0; i < size; i++) {
String partitionColumn = partitionColumns.getElements().getString(i);
if (!partitionColumn.equals(ele.get(i))) {
throw unsupportedPartitionColumnsChange();
}
}
return null;
});
this.schema.map(ele -> {
StructType schema = metadata.getSchema();
SchemaUtils.validateSchemaEvolution(schema, ele);
return null;
});
}

@Override
public Transaction build(Engine engine) {
SnapshotImpl snapshot;
try {
snapshot = (SnapshotImpl) table.getLatestSnapshot(engine);
validateTableEvolution(snapshot);
Metadata metadata = snapshot.getMetadata();
configuration.map(ele -> {
for (Entry<String, String> entry : ele.entrySet()) {
if (metadata.updateConfiguration(entry.getKey(), entry.getValue())) {
metadataChange = true;
}
}
return null;
});
schema.map(ele -> {
metadataChange = true;
metadata.updateSchema(ele);
metadata.updateSchemaString(ele.toJson());
return null;
});
} catch (TableNotFoundException tblf) {
String tablePath = table.getPath(engine);
logger.info("Table {} doesn't exist yet. Trying to create a new table.", tablePath);
Expand All @@ -102,7 +152,7 @@ public Transaction build(Engine engine) {
}

boolean isNewTable = snapshot.getVersion(engine) < 0;
validate(engine, snapshot, isNewTable);
validate(engine, snapshot);

return new TransactionImpl(
isNewTable,
Expand All @@ -113,40 +163,30 @@ public Transaction build(Engine engine) {
operation,
snapshot.getProtocol(),
snapshot.getMetadata(),
setTxnOpt);
setTxnOpt,
metadataChange);
}


/**
* Validate the given parameters for the transaction.
*/
private void validate(Engine engine, SnapshotImpl snapshot, boolean isNewTable) {
private void validate(Engine engine, SnapshotImpl snapshot) {
String tablePath = table.getPath(engine);
// Validate the table has no features that Kernel doesn't yet support writing into it.
TableFeatures.validateWriteSupportedTable(
snapshot.getProtocol(),
snapshot.getMetadata(),
snapshot.getMetadata().getSchema(),
tablePath);

if (!isNewTable) {
if (schema.isPresent()) {
throw tableAlreadyExists(
tablePath,
"Table already exists, but provided a new schema. " +
"Schema can only be set on a new table.");
}
if (partitionColumns.isPresent()) {
throw tableAlreadyExists(
tablePath,
"Table already exists, but provided new partition columns. "
+ "Partition columns can only be set on a new table.");
}
} else {
// New table verify the given schema and partition columns
SchemaUtils.validateSchema(schema.get(), false /* isColumnMappingEnabled */);
// New table verify the given schema and partition columns
schema.map(ele -> {
SchemaUtils.validateSchema(ele, false /* isColumnMappingEnabled */);
SchemaUtils.validatePartitionColumns(
schema.get(), partitionColumns.orElse(Collections.emptyList()));
}
ele, partitionColumns.orElse(Collections.emptyList()));
return null;
});


setTxnOpt.ifPresent(txnId -> {
Optional<Long> lastTxnVersion = snapshot.getLatestTransactionVersion(txnId.getAppId());
Expand Down Expand Up @@ -201,7 +241,7 @@ private Metadata getInitialMetadata() {
schema.get(), /* schema */
stringArrayValue(partitionColumnsCasePreserving), /* partitionColumns */
Optional.of(currentTimeMillis), /* createdTime */
stringStringMapValue(Collections.emptyMap()) /* configuration */
configuration.orElse(Collections.emptyMap()) /* configuration */
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class TransactionImpl
private final Metadata metadata;
private final SnapshotImpl readSnapshot;
private final Optional<SetTransaction> setTxnOpt;
private boolean metadataChange;

private boolean closed; // To avoid trying to commit the same transaction again.

Expand All @@ -83,6 +84,21 @@ public TransactionImpl(
Protocol protocol,
Metadata metadata,
Optional<SetTransaction> setTxnOpt) {
this(isNewTable, dataPath, logPath, readSnapshot, engineInfo, operation, protocol, metadata,
setTxnOpt, false);
}

public TransactionImpl(
boolean isNewTable,
Path dataPath,
Path logPath,
SnapshotImpl readSnapshot,
String engineInfo,
Operation operation,
Protocol protocol,
Metadata metadata,
Optional<SetTransaction> setTxnOpt,
boolean metadataChange) {
this.isNewTable = isNewTable;
this.dataPath = dataPath;
this.logPath = logPath;
Expand All @@ -92,6 +108,7 @@ public TransactionImpl(
this.protocol = protocol;
this.metadata = metadata;
this.setTxnOpt = setTxnOpt;
this.metadataChange = metadataChange;
}

@Override
Expand Down Expand Up @@ -151,8 +168,7 @@ private TransactionCommitResult doCommit(
throws FileAlreadyExistsException {
List<Row> metadataActions = new ArrayList<>();
metadataActions.add(createCommitInfoSingleAction(generateCommitAction()));
if (isNewTable) {
// In the future, we need to add metadata and action when there are any changes to them.
if (isNewTable || metadataChange) {
metadataActions.add(createMetadataSingleAction(metadata.toRow()));
metadataActions.add(createProtocolSingleAction(protocol.toRow()));
}
Expand Down
Loading
Loading