diff --git a/kernel/build.sbt b/kernel/build.sbt index 257e4ea4f6..17a5945f68 100644 --- a/kernel/build.sbt +++ b/kernel/build.sbt @@ -78,17 +78,11 @@ lazy val kernelDefault = (project in file("kernel-default")) scalaStyleSettings, releaseSettings, libraryDependencies ++= Seq( - "org.apache.hadoop" % "hadoop-client-api" % hadoopVersion, // Configuration, Path - "io.delta" % "delta-storage" % deltaStorageVersion, // LogStore - "com.fasterxml.jackson.core" % "jackson-databind" % "2.13.5", // ObjectMapper + "org.apache.hadoop" % "hadoop-client-runtime" % hadoopVersion, + "com.fasterxml.jackson.core" % "jackson-databind" % "2.13.5", "org.apache.parquet" % "parquet-hadoop" % "1.12.3", "org.scalatest" %% "scalatest" % scalaTestVersion % "test", - "io.delta" %% "delta-core" % deltaSparkVersion % "test", - "org.apache.spark" %% "spark-sql" % sparkVersion % "test", // SparkSession - "org.apache.spark" %% "spark-sql" % sparkVersion % "test" classifier "tests", - "org.apache.spark" %% "spark-core" % sparkVersion % "test" classifier "tests", - "org.apache.spark" %% "spark-catalyst" % sparkVersion % "test" classifier "tests", "junit" % "junit" % "4.11" % "test", "com.novocode" % "junit-interface" % "0.11" % "test" ) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/data/ColumnarBatch.java b/kernel/kernel-api/src/main/java/io/delta/kernel/data/ColumnarBatch.java index 6157cd796d..873cfb3877 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/data/ColumnarBatch.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/data/ColumnarBatch.java @@ -19,6 +19,8 @@ import io.delta.kernel.types.StructType; import io.delta.kernel.utils.CloseableIterator; +import io.delta.kernel.internal.ColumnarBatchRow; + /** * Represents zero or more rows of records with same schema type. */ @@ -55,8 +57,30 @@ default ColumnarBatch slice(int start, int end) { /** * @return iterator of {@link Row}s in this batch */ - default CloseableIterator getRows() { - // TODO needs io.delta.kernel.internal.ColumnarBatchRow - throw new UnsupportedOperationException("Not yet implemented!"); + default CloseableIterator getRows() + { + final ColumnarBatch batch = this; + return new CloseableIterator() + { + int rowId = 0; + int maxRowId = getSize(); + + @Override + public boolean hasNext() + { + return rowId < maxRowId; + } + + @Override + public Row next() + { + Row row = new ColumnarBatchRow(batch, rowId); + rowId += 1; + return row; + } + + @Override + public void close() {} + }; } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/data/Row.java b/kernel/kernel-api/src/main/java/io/delta/kernel/data/Row.java index 06c16a0b31..106b2bf357 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/data/Row.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/data/Row.java @@ -24,7 +24,8 @@ /** * Represent a single record */ -public interface Row { +public interface Row +{ /** * @return Schema of the record. @@ -43,6 +44,18 @@ public interface Row { */ boolean getBoolean(int ordinal); + /** + * Return byte value of the column located at the given ordinal. + * Throws error if the column at given ordinal is not of boolean type, + */ + byte getByte(int ordinal); + + /** + * Return short value of the column located at the given ordinal. + * Throws error if the column at given ordinal is not of boolean type, + */ + short getShort(int ordinal); + /** * Return integer value of the column located at the given ordinal. * Throws error if the column at given ordinal is not of integer type, @@ -55,12 +68,30 @@ public interface Row { */ long getLong(int ordinal); + /** + * Return float value of the column located at the given ordinal. + * Throws error if the column at given ordinal is not of long type, + */ + float getFloat(int ordinal); + + /** + * Return double value of the column located at the given ordinal. + * Throws error if the column at given ordinal is not of long type, + */ + double getDouble(int ordinal); + /** * Return string value of the column located at the given ordinal. * Throws error if the column at given ordinal is not of varchar type, */ String getString(int ordinal); + /** + * Return binary value of the column located at the given ordinal. + * Throws error if the column at given ordinal is not of varchar type, + */ + byte[] getBinary(int ordinal); + /** * Return struct value of the column located at the given ordinal. * Throws error if the column at given ordinal is not of struct type, diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/expressions/Literal.java b/kernel/kernel-api/src/main/java/io/delta/kernel/expressions/Literal.java index 4c678847b6..897daecf63 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/expressions/Literal.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/expressions/Literal.java @@ -16,14 +16,26 @@ package io.delta.kernel.expressions; +import java.sql.Date; +import java.sql.Timestamp; import java.util.Objects; import io.delta.kernel.data.Row; +import io.delta.kernel.types.ArrayType; +import io.delta.kernel.types.BinaryType; import io.delta.kernel.types.BooleanType; +import io.delta.kernel.types.ByteType; import io.delta.kernel.types.DataType; +import io.delta.kernel.types.DateType; +import io.delta.kernel.types.DoubleType; +import io.delta.kernel.types.FloatType; import io.delta.kernel.types.IntegerType; import io.delta.kernel.types.LongType; +import io.delta.kernel.types.MapType; +import io.delta.kernel.types.ShortType; import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructType; +import io.delta.kernel.types.TimestampType; /** * A literal value. @@ -31,93 +43,174 @@ * Only supports primitive data types, see * Delta Transaction Log Protocol: Primitive Types. */ -public final class Literal extends LeafExpression { - - //////////////////////////////////////////////////////////////////////////////// - // Static Fields / Methods - //////////////////////////////////////////////////////////////////////////////// - +public final class Literal extends LeafExpression +{ public static final Literal TRUE = Literal.of(true); public static final Literal FALSE = Literal.of(false); - /** - * Create an integer {@link Literal} object - * @param value integer value - * @return a {@link Literal} with data type {@link IntegerType} - */ - public static Literal of(int value) { - return new Literal(value, IntegerType.INSTANCE); - } - /** * Create a boolean {@link Literal} object + * * @param value boolean value * @return a {@link Literal} with data type {@link BooleanType} */ - public static Literal of(boolean value) { + public static Literal of(boolean value) + { return new Literal(value, BooleanType.INSTANCE); } + /** + * @return a {@link Literal} with data type {@link ByteType} + */ + public static Literal of(byte value) + { + return new Literal(value, ByteType.INSTANCE); + } + + /** + * @return a {@link Literal} with data type {@link ShortType} + */ + public static Literal of(short value) + { + return new Literal(value, ShortType.INSTANCE); + } + + /** + * Create an integer {@link Literal} object + * + * @param value integer value + * @return a {@link Literal} with data type {@link IntegerType} + */ + public static Literal of(int value) + { + return new Literal(value, IntegerType.INSTANCE); + } + /** * Create a long {@link Literal} object + * * @param value long value * @return a {@link Literal} with data type {@link LongType} */ - public static Literal of(long value) { + public static Literal of(long value) + { return new Literal(value, LongType.INSTANCE); } + /** + * @return a {@link Literal} with data type {@link FloatType} + */ + public static Literal of(float value) + { + return new Literal(value, FloatType.INSTANCE); + } + + /** + * @return a {@link Literal} with data type {@link DoubleType} + */ + public static Literal of(double value) + { + return new Literal(value, DoubleType.INSTANCE); + } + /** * Create a string {@link Literal} object + * * @param value string value * @return a {@link Literal} with data type {@link StringType} */ - public static Literal of(String value) { + public static Literal of(String value) + { return new Literal(value, StringType.INSTANCE); } - //////////////////////////////////////////////////////////////////////////////// - // Instance Fields / Methods - //////////////////////////////////////////////////////////////////////////////// + /** + * @return a {@link Literal} with data type {@link BinaryType} + */ + public static Literal of(byte[] value) + { + return new Literal(value, BinaryType.INSTANCE); + } + + /** + * @return a {@link Literal} with data type {@link DateType} + */ + public static Literal of(Date value) + { + return new Literal(value, DateType.INSTANCE); + } + + /** + * @return a {@link Literal} with data type {@link TimestampType} + */ + public static Literal of(Timestamp value) + { + return new Literal(value, TimestampType.INSTANCE); + } + + /** + * @return a null {@link Literal} with the given data type + */ + public static Literal ofNull(DataType dataType) + { + if (dataType instanceof ArrayType + || dataType instanceof MapType + || dataType instanceof StructType) { + throw new IllegalArgumentException( + dataType + " is an invalid data type for Literal."); + } + return new Literal(null, dataType); + } private final Object value; private final DataType dataType; - private Literal(Object value, DataType dataType) { + private Literal(Object value, DataType dataType) + { this.value = value; this.dataType = dataType; } - public Object value() { + public Object value() + { return value; } @Override - public Object eval(Row record) { + public Object eval(Row record) + { return value; } @Override - public DataType dataType() { + public DataType dataType() + { return dataType; } @Override - public String toString() { + public String toString() + { return String.valueOf(value); } @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } Literal literal = (Literal) o; return Objects.equals(value, literal.value) && Objects.equals(dataType, literal.dataType); } @Override - public int hashCode() { + public int hashCode() + { return Objects.hash(value, dataType); } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/ColumnarBatchRow.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/ColumnarBatchRow.java new file mode 100644 index 0000000000..24e2b3a4ea --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/ColumnarBatchRow.java @@ -0,0 +1,130 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal; + +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.data.Row; +import io.delta.kernel.types.StructType; + +/** + * Row abstraction around a columnar batch and a particular row within the columnar batch. + */ +public class ColumnarBatchRow + implements Row +{ + private final ColumnarBatch columnarBatch; + private final int rowId; + + public ColumnarBatchRow(ColumnarBatch columnarBatch, int rowId) + { + this.columnarBatch = Objects.requireNonNull(columnarBatch, "columnarBatch is null"); + this.rowId = rowId; + } + + @Override + public StructType getSchema() + { + return columnarBatch.getSchema(); + } + + @Override + public boolean isNullAt(int ordinal) + { + return columnVector(ordinal).isNullAt(rowId); + } + + @Override + public boolean getBoolean(int ordinal) + { + return columnVector(ordinal).getBoolean(rowId); + } + + @Override + public byte getByte(int ordinal) + { + return columnVector(ordinal).getByte(rowId); + } + + @Override + public short getShort(int ordinal) + { + return columnVector(ordinal).getShort(rowId); + } + + @Override + public int getInt(int ordinal) + { + return columnVector(ordinal).getInt(rowId); + } + + @Override + public long getLong(int ordinal) + { + return columnVector(ordinal).getLong(rowId); + } + + @Override + public float getFloat(int ordinal) + { + return columnVector(ordinal).getFloat(rowId); + } + + @Override + public double getDouble(int ordinal) + { + return columnVector(ordinal).getDouble(rowId); + } + + @Override + public String getString(int ordinal) + { + return columnVector(ordinal).getString(rowId); + } + + @Override + public byte[] getBinary(int ordinal) + { + return columnVector(ordinal).getBinary(rowId); + } + + @Override + public Row getStruct(int ordinal) + { + return columnVector(ordinal).getStruct(rowId); + } + + @Override + public List getArray(int ordinal) + { + return columnVector(ordinal).getArray(rowId); + } + + @Override + public Map getMap(int ordinal) + { + return columnVector(ordinal).getMap(rowId); + } + + private ColumnVector columnVector(int ordinal) + { + return columnarBatch.getColumnVector(ordinal); + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/utils/CloseableIterator.java b/kernel/kernel-api/src/main/java/io/delta/kernel/utils/CloseableIterator.java index d32e92ca8c..04c5193f9c 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/utils/CloseableIterator.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/utils/CloseableIterator.java @@ -22,10 +22,13 @@ import java.util.function.Consumer; import java.util.function.Function; -public interface CloseableIterator extends Iterator, Closeable { - default CloseableIterator map(Function mapper) { +public interface CloseableIterator extends Iterator, Closeable +{ + default CloseableIterator map(Function mapper) + { CloseableIterator delegate = this; - return new CloseableIterator() { + return new CloseableIterator() + { @Override public void remove() { @@ -52,7 +55,7 @@ public U next() @Override public void close() - throws IOException + throws IOException { delegate.close(); } diff --git a/kernel/kernel-api/src/test/java/io/delta/kernel/internal/types/JsonHandlerTestImpl.java b/kernel/kernel-api/src/test/java/io/delta/kernel/internal/types/JsonHandlerTestImpl.java index 25e4314108..c5f43e420c 100644 --- a/kernel/kernel-api/src/test/java/io/delta/kernel/internal/types/JsonHandlerTestImpl.java +++ b/kernel/kernel-api/src/test/java/io/delta/kernel/internal/types/JsonHandlerTestImpl.java @@ -230,6 +230,18 @@ public boolean getBoolean(int ordinal) return (boolean) parsedValues[ordinal]; } + @Override + public byte getByte(int ordinal) + { + throw new UnsupportedOperationException("not yet implemented - test only"); + } + + @Override + public short getShort(int ordinal) + { + throw new UnsupportedOperationException("not yet implemented - test only"); + } + @Override public int getInt(int ordinal) { @@ -242,12 +254,30 @@ public long getLong(int ordinal) return (long) parsedValues[ordinal]; } + @Override + public float getFloat(int ordinal) + { + throw new UnsupportedOperationException("not yet implemented - test only"); + } + + @Override + public double getDouble(int ordinal) + { + throw new UnsupportedOperationException("not yet implemented - test only"); + } + @Override public String getString(int ordinal) { return (String) parsedValues[ordinal]; } + @Override + public byte[] getBinary(int ordinal) + { + throw new UnsupportedOperationException("not yet implemented - test only"); + } + @Override public Row getStruct(int ordinal) { diff --git a/kernel/kernel-default/src/main/java/io/delta/kernel/DefaultKernelUtils.java b/kernel/kernel-default/src/main/java/io/delta/kernel/DefaultKernelUtils.java new file mode 100644 index 0000000000..f510fe81c0 --- /dev/null +++ b/kernel/kernel-default/src/main/java/io/delta/kernel/DefaultKernelUtils.java @@ -0,0 +1,99 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel; + +import java.sql.Date; +import java.time.LocalDate; +import java.time.temporal.ChronoUnit; + +public class DefaultKernelUtils +{ + private static final LocalDate EPOCH = LocalDate.ofEpochDay(0); + + private DefaultKernelUtils() + { + } + + /** + * Precondition-style validation that throws {@link IllegalArgumentException}. + * + * @param isValid {@code true} if valid, {@code false} if an exception should be thrown + * @throws IllegalArgumentException if {@code isValid} is false + */ + public static void checkArgument(boolean isValid) + throws IllegalArgumentException + { + if (!isValid) { + throw new IllegalArgumentException(); + } + } + + /** + * Precondition-style validation that throws {@link IllegalArgumentException}. + * + * @param isValid {@code true} if valid, {@code false} if an exception should be thrown + * @param message A String message for the exception. + * @throws IllegalArgumentException if {@code isValid} is false + */ + public static void checkArgument(boolean isValid, String message) + throws IllegalArgumentException + { + if (!isValid) { + throw new IllegalArgumentException(message); + } + } + + /** + * Precondition-style validation that throws {@link IllegalArgumentException}. + * + * @param isValid {@code true} if valid, {@code false} if an exception should be thrown + * @param message A String message for the exception. + * @param args Objects used to fill in {@code %s} placeholders in the message + * @throws IllegalArgumentException if {@code isValid} is false + */ + public static void checkArgument(boolean isValid, String message, Object... args) + throws IllegalArgumentException + { + if (!isValid) { + throw new IllegalArgumentException( + String.format(String.valueOf(message), args)); + } + } + + /** + * Precondition-style validation that throws {@link IllegalStateException}. + * + * @param isValid {@code true} if valid, {@code false} if an exception should be thrown + * @param message A String message for the exception. + * @throws IllegalStateException if {@code isValid} is false + */ + public static void checkState(boolean isValid, String message) + throws IllegalStateException + { + if (!isValid) { + throw new IllegalStateException(message); + } + } + + /** + * Utility method to get the number of days since epoch this given date is. + * @param date + */ + public static int daysSinceEpoch(Date date) { + LocalDate localDate = date.toLocalDate(); + return (int) ChronoUnit.DAYS.between(EPOCH, localDate); + } +} diff --git a/kernel/kernel-default/src/main/java/io/delta/kernel/client/DefaultExpressionHandler.java b/kernel/kernel-default/src/main/java/io/delta/kernel/client/DefaultExpressionHandler.java new file mode 100644 index 0000000000..e2e92c920f --- /dev/null +++ b/kernel/kernel-default/src/main/java/io/delta/kernel/client/DefaultExpressionHandler.java @@ -0,0 +1,143 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel.client; + +import static io.delta.kernel.DefaultKernelUtils.checkArgument; +import static io.delta.kernel.DefaultKernelUtils.daysSinceEpoch; +import java.sql.Date; +import java.sql.Timestamp; +import java.util.Optional; + +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.data.Row; +import io.delta.kernel.data.vector.DefaultBooleanVector; +import io.delta.kernel.data.vector.DefaultConstantVector; +import io.delta.kernel.expressions.Expression; +import io.delta.kernel.expressions.ExpressionEvaluator; +import io.delta.kernel.expressions.Literal; +import io.delta.kernel.types.BinaryType; +import io.delta.kernel.types.BooleanType; +import io.delta.kernel.types.ByteType; +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.DateType; +import io.delta.kernel.types.DoubleType; +import io.delta.kernel.types.FloatType; +import io.delta.kernel.types.IntegerType; +import io.delta.kernel.types.LongType; +import io.delta.kernel.types.ShortType; +import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructType; +import io.delta.kernel.types.TimestampType; +import io.delta.kernel.utils.CloseableIterator; + +public class DefaultExpressionHandler + implements ExpressionHandler +{ + @Override + public ExpressionEvaluator getEvaluator(StructType batchSchema, Expression expression) + { + return new DefaultExpressionEvaluator(expression); + } + + private static class DefaultExpressionEvaluator + implements ExpressionEvaluator + { + private final Expression expression; + + private DefaultExpressionEvaluator(Expression expression) + { + this.expression = expression; + } + + @Override + public ColumnVector eval(ColumnarBatch input) + { + if (expression instanceof Literal) { + return evalLiteralExpression(input, (Literal) expression); + } + + if (expression.dataType().equals(BooleanType.INSTANCE)) { + return evalBooleanOutputExpression(input, expression); + } + // TODO: Boolean output type expressions are good enough for first preview release + // which enables partition pruning and file skipping using file stats. + + throw new UnsupportedOperationException("not yet implemented"); + } + + @Override + public void close() { /* nothing to close */ } + } + + private static ColumnVector evalBooleanOutputExpression( + ColumnarBatch input, Expression expression) + { + checkArgument(expression.dataType().equals(BooleanType.INSTANCE), + "expression should return a boolean"); + + final int batchSize = input.getSize(); + boolean[] result = new boolean[batchSize]; + boolean[] nullResult = new boolean[batchSize]; + CloseableIterator rows = input.getRows(); + for (int currentIndex = 0; currentIndex < batchSize; currentIndex++) { + Object evalResult = expression.eval(rows.next()); + if (evalResult == null) { + nullResult[currentIndex] = true; + } + else { + result[currentIndex] = ((Boolean) evalResult).booleanValue(); + } + } + return new DefaultBooleanVector(batchSize, Optional.of(nullResult), result); + } + + private static ColumnVector evalLiteralExpression(ColumnarBatch input, Literal literal) + { + Object result = literal.value(); + DataType dataType = literal.dataType(); + int size = input.getSize(); + + if (result == null) { + return new DefaultConstantVector(dataType, size, null); + } + + if (dataType instanceof BooleanType || + dataType instanceof ByteType || + dataType instanceof ShortType || + dataType instanceof IntegerType || + dataType instanceof LongType || + dataType instanceof FloatType || + dataType instanceof DoubleType || + dataType instanceof StringType || + dataType instanceof BinaryType) { + return new DefaultConstantVector(dataType, size, result); + } + else if (dataType instanceof DateType) { + int numOfDaysSinceEpoch = daysSinceEpoch((Date) result); + return new DefaultConstantVector(dataType, size, numOfDaysSinceEpoch); + } + else if (dataType instanceof TimestampType) { + Timestamp timestamp = (Timestamp) result; + long micros = timestamp.getTime() * 1000; + return new DefaultConstantVector(dataType, size, micros); + } + + throw new UnsupportedOperationException( + "unsupported expression encountered: " + literal); + } +} diff --git a/kernel/kernel-default/src/main/java/io/delta/kernel/client/DefaultFileHandler.java b/kernel/kernel-default/src/main/java/io/delta/kernel/client/DefaultFileHandler.java new file mode 100644 index 0000000000..6214bf385d --- /dev/null +++ b/kernel/kernel-default/src/main/java/io/delta/kernel/client/DefaultFileHandler.java @@ -0,0 +1,38 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.client; + +import static java.util.Objects.requireNonNull; + +import io.delta.kernel.data.Row; +import io.delta.kernel.expressions.Expression; +import io.delta.kernel.utils.CloseableIterator; + +/** + * Default client implementation of {@link FileHandler}. It splits file as one split. + */ +public class DefaultFileHandler + implements FileHandler +{ + @Override + public CloseableIterator contextualizeFileReads( + CloseableIterator fileIter, Expression filter) + { + requireNonNull(fileIter, "fileIter is null"); + requireNonNull(filter, "filter is null"); + return fileIter.map(scanFileRow -> new DefaultFileReadContext(scanFileRow, filter)); + } +} diff --git a/kernel/kernel-default/src/main/java/io/delta/kernel/client/DefaultFileReadContext.java b/kernel/kernel-default/src/main/java/io/delta/kernel/client/DefaultFileReadContext.java new file mode 100644 index 0000000000..81902c0e23 --- /dev/null +++ b/kernel/kernel-default/src/main/java/io/delta/kernel/client/DefaultFileReadContext.java @@ -0,0 +1,45 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.client; + +import static java.util.Objects.requireNonNull; + +import io.delta.kernel.data.Row; +import io.delta.kernel.expressions.Expression; + +public class DefaultFileReadContext + implements FileReadContext +{ + private final Row scanFileRow; + private final Expression filter; + + public DefaultFileReadContext(Row scanFileRow, Expression filter) + { + this.scanFileRow = requireNonNull(scanFileRow, "scanFileRow is null"); + this.filter = requireNonNull(filter, "filter is null"); + } + + @Override + public Row getScanFileRow() + { + return this.scanFileRow; + } + + public Expression getFilter() + { + return this.filter; + } +} diff --git a/kernel/kernel-default/src/main/java/io/delta/kernel/client/DefaultFileSystemClient.java b/kernel/kernel-default/src/main/java/io/delta/kernel/client/DefaultFileSystemClient.java new file mode 100644 index 0000000000..be1a785b96 --- /dev/null +++ b/kernel/kernel-default/src/main/java/io/delta/kernel/client/DefaultFileSystemClient.java @@ -0,0 +1,87 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.client; + +import java.io.FileNotFoundException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import io.delta.kernel.fs.FileStatus; +import io.delta.kernel.utils.CloseableIterator; + +public class DefaultFileSystemClient + implements FileSystemClient +{ + private final Configuration hadoopConf; + + public DefaultFileSystemClient(Configuration hadoopConf) + { + this.hadoopConf = hadoopConf; + } + + @Override + public CloseableIterator listFrom(String filePath) + { + return new CloseableIterator() + { + private final Iterator iter; + + { + try { + Path path = new Path(filePath); + FileSystem fs = path.getFileSystem(hadoopConf); + if (!fs.exists(path.getParent())) { + throw new FileNotFoundException( + String.format("No such file or directory: %s", path.getParent()) + ); + } + org.apache.hadoop.fs.FileStatus[] files = fs.listStatus(path.getParent()); + iter = Arrays.stream(files) + .filter(f -> f.getPath().getName().compareTo(path.getName()) >= 0) + .sorted(Comparator.comparing(o -> o.getPath().getName())) + .iterator(); + } + catch (Exception ex) { + throw new RuntimeException("Could not resolve the FileSystem", ex); + } + } + + @Override + public boolean hasNext() + { + return iter.hasNext(); + } + + @Override + public FileStatus next() + { + final org.apache.hadoop.fs.FileStatus impl = iter.next(); + return FileStatus.of( + impl.getPath().toString(), + impl.getLen(), + impl.getModificationTime()); + } + + @Override + public void close() {} + }; + } +} diff --git a/kernel/kernel-default/src/main/java/io/delta/kernel/client/DefaultJsonHandler.java b/kernel/kernel-default/src/main/java/io/delta/kernel/client/DefaultJsonHandler.java new file mode 100644 index 0000000000..f8b06ddbba --- /dev/null +++ b/kernel/kernel-default/src/main/java/io/delta/kernel/client/DefaultJsonHandler.java @@ -0,0 +1,164 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.client; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.data.DefaultRowBasedColumnarBatch; +import io.delta.kernel.data.FileDataReadResult; +import io.delta.kernel.data.DefaultJsonRow; +import io.delta.kernel.data.Row; +import io.delta.kernel.fs.FileStatus; +import io.delta.kernel.types.StructType; +import io.delta.kernel.utils.CloseableIterator; + +public class DefaultJsonHandler + extends DefaultFileHandler + implements JsonHandler +{ + private final ObjectMapper objectMapper; + private final Configuration hadoopConf; + private final int maxBatchSize; + + public DefaultJsonHandler(Configuration hadoopConf) + { + this.objectMapper = new ObjectMapper(); + this.hadoopConf = hadoopConf; + this.maxBatchSize = + hadoopConf.getInt("delta.kernel.default.json.reader.batch-size", 1024); + } + + @Override + public ColumnarBatch parseJson(ColumnVector jsonStringVector, StructType outputSchema) + { + List rows = new ArrayList<>(); + for (int i = 0; i < jsonStringVector.getSize(); i++) { + rows.add(parseJson(jsonStringVector.getString(i), outputSchema)); + } + return new DefaultRowBasedColumnarBatch(outputSchema, rows); + } + + @Override + public CloseableIterator readJsonFiles( + CloseableIterator fileIter, + StructType physicalSchema) throws IOException + { + return new CloseableIterator() { + private FileReadContext currentFile; + private BufferedReader currentFileReader; + private String nextLine; + + @Override + public void close() + throws IOException + { + if (currentFileReader != null) { + currentFileReader.close(); + } + + fileIter.close(); + // TODO: implement safe close of multiple closeables. + } + + @Override + public boolean hasNext() + { + // There is no file in reading or the current file being read has no more data + // initialize the next file reader or return false if there are no more files to + // read. + try { + if (currentFileReader == null || + (nextLine = currentFileReader.readLine()) == null) { + if (fileIter.hasNext()) { + currentFile = fileIter.next(); + FileStatus fileStatus = io.delta.kernel.utils.Utils.getFileStatus( + currentFile.getScanFileRow()); + Path filePath = new Path(fileStatus.getPath()); + FileSystem fs = filePath.getFileSystem(hadoopConf); + FSDataInputStream stream = fs.open(filePath); + currentFileReader = new BufferedReader( + new InputStreamReader(stream, StandardCharsets.UTF_8)); + nextLine = currentFileReader.readLine(); + } else { + return false; + } + } + } catch (IOException ex) { + throw new RuntimeException(ex); + } + + return nextLine != null; + } + + @Override + public FileDataReadResult next() + { + List rows = new ArrayList<>(); + int i = 0; + do { + // hasNext already reads the next one and keeps it in member variable `nextLine` + rows.add(parseJson(nextLine, physicalSchema)); + try { + nextLine = currentFileReader.readLine(); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } while(i < maxBatchSize && nextLine != null); + ColumnarBatch nextBatch = new DefaultRowBasedColumnarBatch(physicalSchema, rows); + + return new FileDataReadResult() { + @Override + public ColumnarBatch getData() + { + return nextBatch; + } + + @Override + public Row getScanFileRow() + { + return currentFile.getScanFileRow(); + } + }; + } + }; + } + + private Row parseJson(String json, StructType readSchema) + { + try { + final JsonNode jsonNode = objectMapper.readTree(json); + return new DefaultJsonRow((ObjectNode) jsonNode, readSchema); + } catch (JsonProcessingException ex) { + throw new RuntimeException(String.format("Could not parse JSON: %s", json), ex); + } + } +} diff --git a/kernel/kernel-default/src/main/java/io/delta/kernel/client/DefaultTableClient.java b/kernel/kernel-default/src/main/java/io/delta/kernel/client/DefaultTableClient.java new file mode 100644 index 0000000000..f8b4bf64ff --- /dev/null +++ b/kernel/kernel-default/src/main/java/io/delta/kernel/client/DefaultTableClient.java @@ -0,0 +1,62 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.client; + +import org.apache.hadoop.conf.Configuration; + +public class DefaultTableClient + implements TableClient +{ + private final Configuration hadoopConf; + + private DefaultTableClient(Configuration hadoopConf) + { + this.hadoopConf = hadoopConf; + } + + @Override + public ExpressionHandler getExpressionHandler() + { + return new DefaultExpressionHandler(); + } + + @Override + public JsonHandler getJsonHandler() + { + return new DefaultJsonHandler(hadoopConf); + } + + @Override + public FileSystemClient getFileSystemClient() + { + return new DefaultFileSystemClient(hadoopConf); + } + + @Override + public ParquetHandler getParquetHandler() + { + throw new UnsupportedOperationException("not yet implemented"); + } + + /** + * Create an instance of {@link DefaultTableClient}. + * @param hadoopConf Hadoop configuration to use. + * @return + */ + public static DefaultTableClient create(Configuration hadoopConf) { + return new DefaultTableClient(hadoopConf); + } +} diff --git a/kernel/kernel-default/src/main/java/io/delta/kernel/data/DefaultColumnarBatch.java b/kernel/kernel-default/src/main/java/io/delta/kernel/data/DefaultColumnarBatch.java new file mode 100644 index 0000000000..6057365eb1 --- /dev/null +++ b/kernel/kernel-default/src/main/java/io/delta/kernel/data/DefaultColumnarBatch.java @@ -0,0 +1,65 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.data; + +import io.delta.kernel.types.StructType; + +public class DefaultColumnarBatch + implements ColumnarBatch +{ + private final StructType dataType; + private final int size; + private final ColumnVector[] columnVectors; + + public DefaultColumnarBatch( + int size, + StructType dataType, + ColumnVector[] columnVectors + ) + { + this.dataType = dataType; + this.size = size; + this.columnVectors = new ColumnVector[columnVectors.length]; + // TODO: argument check. + System.arraycopy(columnVectors, 0, this.columnVectors, 0, columnVectors.length); + } + + @Override + public StructType getSchema() + { + return dataType; + } + + @Override + public ColumnVector getColumnVector(int ordinal) + { + checkColumnOrdinal(ordinal); + return columnVectors[ordinal]; + } + + @Override + public int getSize() + { + return size; + } + + private void checkColumnOrdinal(int ordinal) + { + if (ordinal < 0 || ordinal >= columnVectors.length) { + throw new IllegalArgumentException("invalid column ordinal"); + } + } +} \ No newline at end of file diff --git a/kernel/kernel-default/src/main/java/io/delta/kernel/data/DefaultJsonRow.java b/kernel/kernel-default/src/main/java/io/delta/kernel/data/DefaultJsonRow.java new file mode 100644 index 0000000000..04cc9f6c15 --- /dev/null +++ b/kernel/kernel-default/src/main/java/io/delta/kernel/data/DefaultJsonRow.java @@ -0,0 +1,239 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.data; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import io.delta.kernel.types.ArrayType; +import io.delta.kernel.types.BooleanType; +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.IntegerType; +import io.delta.kernel.types.LongType; +import io.delta.kernel.types.MapType; +import io.delta.kernel.types.MixedDataType; +import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructField; +import io.delta.kernel.types.StructType; + +public class DefaultJsonRow implements Row +{ + private final Object[] parsedValues; + private final StructType readSchema; + + public DefaultJsonRow(ObjectNode rootNode, StructType readSchema) + { + this.readSchema = readSchema; + this.parsedValues = new Object[readSchema.length()]; + + for (int i = 0; i < readSchema.length(); i++) { + final StructField field = readSchema.at(i); + final Object parsedValue = decodeField(rootNode, field); + parsedValues[i] = parsedValue; + } + } + + @Override + public StructType getSchema() + { + return readSchema; + } + + @Override + public boolean isNullAt(int ordinal) + { + return parsedValues[ordinal] == null; + } + + @Override + public boolean getBoolean(int ordinal) + { + return (boolean) parsedValues[ordinal]; + } + + @Override + public byte getByte(int ordinal) + { + throw new UnsupportedOperationException("not yet implemented"); + } + + @Override + public short getShort(int ordinal) + { + throw new UnsupportedOperationException("not yet implemented"); + } + + @Override + public int getInt(int ordinal) + { + return (int) parsedValues[ordinal]; + } + + @Override + public long getLong(int ordinal) + { + return (long) parsedValues[ordinal]; + } + + @Override + public float getFloat(int ordinal) + { + throw new UnsupportedOperationException("not yet implemented"); + } + + @Override + public double getDouble(int ordinal) + { + throw new UnsupportedOperationException("not yet implemented"); + } + + @Override + public String getString(int ordinal) + { + return (String) parsedValues[ordinal]; + } + + @Override + public byte[] getBinary(int ordinal) + { + throw new UnsupportedOperationException("not yet implemented"); + } + + @Override + public Row getStruct(int ordinal) + { + return (DefaultJsonRow) parsedValues[ordinal]; + } + + @Override + public List getArray(int ordinal) + { + return (List) parsedValues[ordinal]; + } + + @Override + public Map getMap(int ordinal) + { + return (Map) parsedValues[ordinal]; + } + + private static void throwIfTypeMismatch(String expType, boolean hasExpType, JsonNode jsonNode) + { + if (!hasExpType) { + throw new RuntimeException( + String.format("Couldn't decode %s, expected a %s", jsonNode, expType)); + } + } + + private static Object decodeElement(JsonNode jsonValue, DataType dataType) + { + if (jsonValue.isNull()) { + return null; + } + + if (dataType.equals(MixedDataType.INSTANCE)) { + if (jsonValue.isTextual()) { + return jsonValue.textValue(); + } + else if (jsonValue instanceof ObjectNode) { + return jsonValue.toString(); + } + throwIfTypeMismatch("object or string", false, jsonValue); + } + + if (dataType instanceof BooleanType) { + throwIfTypeMismatch("boolean", jsonValue.isBoolean(), jsonValue); + return jsonValue.booleanValue(); + } + + if (dataType instanceof IntegerType) { + throwIfTypeMismatch( + "integer", jsonValue.isIntegralNumber() && !jsonValue.isLong(), jsonValue); + return jsonValue.intValue(); + } + + if (dataType instanceof LongType) { + throwIfTypeMismatch("long", jsonValue.isIntegralNumber(), jsonValue); + return jsonValue.numberValue().longValue(); + } + + if (dataType instanceof StringType) { + throwIfTypeMismatch("string", jsonValue.isTextual(), jsonValue); + return jsonValue.asText(); + } + + if (dataType instanceof StructType) { + throwIfTypeMismatch("object", jsonValue.isObject(), jsonValue); + return new DefaultJsonRow((ObjectNode) jsonValue, (StructType) dataType); + } + + if (dataType instanceof ArrayType) { + throwIfTypeMismatch("array", jsonValue.isArray(), jsonValue); + final ArrayType arrayType = ((ArrayType) dataType); + final ArrayNode jsonArray = (ArrayNode) jsonValue; + final List output = new ArrayList<>(); + + for (Iterator it = jsonArray.elements(); it.hasNext(); ) { + final JsonNode element = it.next(); + final Object parsedElement = decodeElement(element, arrayType.getElementType()); + output.add(parsedElement); + } + return output; + } + + if (dataType instanceof MapType) { + throwIfTypeMismatch("map", jsonValue.isObject(), jsonValue); + final MapType mapType = (MapType) dataType; + final Iterator> iter = jsonValue.fields(); + final Map output = new HashMap<>(); + + while (iter.hasNext()) { + Map.Entry entry = iter.next(); + String keyParsed = entry.getKey(); + Object valueParsed = decodeElement(entry.getValue(), mapType.getValueType()); + output.put(keyParsed, valueParsed); + } + + return output; + } + + throw new UnsupportedOperationException( + String.format("Unsupported DataType %s for RootNode %s", dataType, jsonValue) + ); + } + + private static Object decodeField(ObjectNode rootNode, StructField field) + { + if (rootNode.get(field.getName()) == null) { + if (field.isNullable()) { + return null; + } + + throw new RuntimeException(String.format( + "Root node at key %s is null but field isn't nullable. Root node: %s", + field.getName(), + rootNode)); + } + + return decodeElement(rootNode.get(field.getName()), field.getDataType()); + } +} diff --git a/kernel/kernel-default/src/main/java/io/delta/kernel/data/DefaultRowBasedColumnarBatch.java b/kernel/kernel-default/src/main/java/io/delta/kernel/data/DefaultRowBasedColumnarBatch.java new file mode 100644 index 0000000000..ed9850905b --- /dev/null +++ b/kernel/kernel-default/src/main/java/io/delta/kernel/data/DefaultRowBasedColumnarBatch.java @@ -0,0 +1,209 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.data; + +import static io.delta.kernel.DefaultKernelUtils.checkArgument; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.StructField; +import io.delta.kernel.types.StructType; + +/** + * {@link ColumnarBatch} wrapper around list of {@link Row} objects. + */ +public class DefaultRowBasedColumnarBatch + implements ColumnarBatch +{ + private final StructType schema; + private final List rows; + private final Map columnIndexToNameMap; + + public DefaultRowBasedColumnarBatch(StructType schema, List rows) + { + this.schema = schema; + this.rows = rows; + this.columnIndexToNameMap = constructColumnIndexMap(schema); + } + + @Override + public StructType getSchema() + { + return schema; + } + + @Override + public int getSize() + { + return rows.size(); + } + + @Override + public ColumnarBatch slice(int start, int end) + { + return null; + } + + @Override + public ColumnVector getColumnVector(int ordinal) + { + String columnName = columnIndexToNameMap.get(ordinal); + StructField field = schema.get(columnName); + return new DefaultColumnVector(field.getDataType(), rows, ordinal); + } + + private static Map constructColumnIndexMap(StructType schema) + { + Map columnIndexToNameMap = new HashMap<>(); + int index = 0; + for (StructField field : schema.fields()) { + columnIndexToNameMap.put(index, field.getName()); + index++; + } + return columnIndexToNameMap; + } + + /** + * Wrapper around list of {@link Row}s to expose the rows as a columnar vector + */ + private static class DefaultColumnVector implements ColumnVector + { + private final DataType dataType; + private final List rows; + private final int columnOrdinal; + + DefaultColumnVector(DataType dataType, List rows, int columnOrdinal) + { + this.dataType = dataType; + this.rows = rows; + this.columnOrdinal = columnOrdinal; + } + + @Override + public DataType getDataType() + { + return dataType; + } + + @Override + public int getSize() + { + return rows.size(); + } + + @Override + public void close() { /* nothing to close */ } + + @Override + public boolean isNullAt(int rowId) + { + assertValidRowId(rowId); + return rows.get(rowId).isNullAt(columnOrdinal); + } + + @Override + public boolean getBoolean(int rowId) + { + assertValidRowId(rowId); + return rows.get(rowId).getBoolean(columnOrdinal); + } + + @Override + public byte getByte(int rowId) + { + assertValidRowId(rowId); + return rows.get(rowId).getByte(columnOrdinal); + } + + @Override + public short getShort(int rowId) + { + assertValidRowId(rowId); + return rows.get(rowId).getShort(columnOrdinal); + } + + @Override + public int getInt(int rowId) + { + assertValidRowId(rowId); + return rows.get(rowId).getInt(columnOrdinal); + } + + @Override + public long getLong(int rowId) + { + assertValidRowId(rowId); + return rows.get(rowId).getLong(columnOrdinal); + } + + @Override + public float getFloat(int rowId) + { + assertValidRowId(rowId); + return rows.get(rowId).getFloat(columnOrdinal); + } + + @Override + public double getDouble(int rowId) + { + assertValidRowId(rowId); + return rows.get(rowId).getDouble(columnOrdinal); + } + + @Override + public String getString(int rowId) + { + assertValidRowId(rowId); + return rows.get(rowId).getString(columnOrdinal); + } + + @Override + public byte[] getBinary(int rowId) + { + assertValidRowId(rowId); + return rows.get(rowId).getBinary(columnOrdinal); + } + + @Override + public Map getMap(int rowId) + { + assertValidRowId(rowId); + return rows.get(rowId).getMap(columnOrdinal); + } + + @Override + public Row getStruct(int rowId) + { + assertValidRowId(rowId); + return rows.get(rowId).getStruct(columnOrdinal); + } + + @Override + public List getArray(int rowId) + { + assertValidRowId(rowId); + return rows.get(rowId).getArray(columnOrdinal); + } + + private void assertValidRowId(int rowId) + { + checkArgument(rowId < rows.size(), + "Invalid rowId: " + rowId + ", max allowed rowId is: " + (rows.size() - 1)); + } + } +} diff --git a/kernel/kernel-default/src/main/java/io/delta/kernel/data/vector/AbstractColumnVector.java b/kernel/kernel-default/src/main/java/io/delta/kernel/data/vector/AbstractColumnVector.java new file mode 100644 index 0000000000..a9711fb1b2 --- /dev/null +++ b/kernel/kernel-default/src/main/java/io/delta/kernel/data/vector/AbstractColumnVector.java @@ -0,0 +1,171 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.data.vector; + +import static io.delta.kernel.DefaultKernelUtils.checkArgument; +import java.util.List; +import java.util.Map; +import static java.util.Objects.requireNonNull; +import java.util.Optional; + +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.data.Row; +import io.delta.kernel.types.DataType; + +/** + * Abstract implementation of {@link ColumnVector} that provides the default functionality + * common to most of the specific data type {@link ColumnVector} implementations. + */ +public abstract class AbstractColumnVector + implements ColumnVector +{ + private final int size; + private final DataType dataType; + private final Optional nullability; + + protected AbstractColumnVector(int size, DataType dataType, Optional nullability) + { + checkArgument(size >= 0, "invalid size: %s", size); + this.size = size; + this.dataType = requireNonNull(dataType); + this.nullability = requireNonNull(nullability); + } + + @Override + public DataType getDataType() + { + return dataType; + } + + @Override + public int getSize() + { + return size; + } + + @Override + public void close() + { + // By default, nothing to close, if the implementation has any resources to release + // it can override it + } + + /** + * Is the value at given {@code rowId} index is null? + * + * @param rowId + * @return + */ + @Override + public boolean isNullAt(int rowId) + { + checkValidRowId(rowId); + return !nullability.isPresent() || nullability.get()[rowId]; + } + + @Override + public boolean getBoolean(int rowId) + { + throw unsupportedDataAccessException("boolean"); + } + + @Override + public byte getByte(int rowId) + { + throw unsupportedDataAccessException("byte"); + } + + @Override + public short getShort(int rowId) + { + throw unsupportedDataAccessException("short"); + } + + @Override + public int getInt(int rowId) + { + throw unsupportedDataAccessException("int"); + } + + @Override + public long getLong(int rowId) + { + throw unsupportedDataAccessException("long"); + } + + @Override + public float getFloat(int rowId) + { + throw unsupportedDataAccessException("float"); + } + + @Override + public double getDouble(int rowId) + { + throw unsupportedDataAccessException("double"); + } + + @Override + public byte[] getBinary(int rowId) + { + throw unsupportedDataAccessException("binary"); + } + + @Override + public String getString(int rowId) + { + throw unsupportedDataAccessException("string"); + } + + @Override + public Map getMap(int rowId) + { + throw unsupportedDataAccessException("map"); + } + + @Override + public Row getStruct(int rowId) + { + throw unsupportedDataAccessException("struct"); + } + + @Override + public List getArray(int rowId) + { + throw unsupportedDataAccessException("array"); + } + + protected UnsupportedOperationException unsupportedDataAccessException(String accessType) + { + String msg = String.format( + "Trying to access a `%s` value from vector of type `%s`", + accessType, + getDataType()); + throw new UnsupportedOperationException(msg); + } + + /** + * Helper method that make sure the given {@code rowId} position is valid in this vector + * + * @param rowId + */ + protected void checkValidRowId(int rowId) + { + if (rowId < 0 || rowId >= size) { + throw new IllegalArgumentException("invalid row access: " + rowId); + } + } +} diff --git a/kernel/kernel-default/src/main/java/io/delta/kernel/data/vector/DefaultBooleanVector.java b/kernel/kernel-default/src/main/java/io/delta/kernel/data/vector/DefaultBooleanVector.java new file mode 100644 index 0000000000..a9afa1d158 --- /dev/null +++ b/kernel/kernel-default/src/main/java/io/delta/kernel/data/vector/DefaultBooleanVector.java @@ -0,0 +1,69 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.data.vector; + +import java.util.Optional; +import static java.util.Objects.requireNonNull; + +import io.delta.kernel.types.BooleanType; +import static io.delta.kernel.DefaultKernelUtils.checkArgument; + +/** + * {@link io.delta.kernel.data.ColumnVector} implementation for boolean type data. + */ +public class DefaultBooleanVector + extends AbstractColumnVector +{ + private final boolean[] values; + + /** + * Create an instance of {@link io.delta.kernel.data.ColumnVector} for boolean type. + * + * @param size number of elements in the vector. + * @param nullability Optional array of nullability value for each element in the vector. + * All values in the vector are considered non-null when parameter is empty. + * @param values column vector values. + */ + public DefaultBooleanVector(int size, Optional nullability, boolean[] values) + { + super(size, BooleanType.INSTANCE, nullability); + this.values = requireNonNull(values, "values is null"); + checkArgument(values.length >= 0, "invalid vector size: %s", values.length); + checkArgument(values.length >= size, + "invalid number of values (%s) for given size (%s)", values.length, size); + if (nullability.isPresent()) { + checkArgument(values.length == nullability.get().length, + "vector element components are not of same size" + + "value array size = %s, nullability array size = %s", + values.length, nullability.get().length + ); + } + } + + /** + * Get the value at given {@code rowId}. The return value is undefined and can be + * anything, if the slot for {@code rowId} is null. + * + * @param rowId + * @return + */ + @Override + public boolean getBoolean(int rowId) + { + checkValidRowId(rowId); + return values[rowId]; + } +} diff --git a/kernel/kernel-default/src/main/java/io/delta/kernel/data/vector/DefaultConstantVector.java b/kernel/kernel-default/src/main/java/io/delta/kernel/data/vector/DefaultConstantVector.java new file mode 100644 index 0000000000..9729972f0d --- /dev/null +++ b/kernel/kernel-default/src/main/java/io/delta/kernel/data/vector/DefaultConstantVector.java @@ -0,0 +1,135 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.data.vector; + +import java.util.List; +import java.util.Map; + +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.data.Row; +import io.delta.kernel.types.DataType; + +public class DefaultConstantVector + implements ColumnVector +{ + private final DataType dataType; + private final int numRows; + private final Object value; + + public DefaultConstantVector(DataType dataType, int numRows, Object value) + { + // TODO: Validate datatype and value object type + this.dataType = dataType; + this.numRows = numRows; + this.value = value; + } + + @Override + public DataType getDataType() + { + return dataType; + } + + @Override + public int getSize() + { + return numRows; + } + + @Override + public void close() + { + // nothing to close + } + + @Override + public boolean isNullAt(int rowId) + { + return value == null; + } + + @Override + public boolean getBoolean(int rowId) + { + return (boolean) value; + } + + @Override + public byte getByte(int rowId) + { + return (byte) value; + } + + @Override + public short getShort(int rowId) + { + return (short) value; + } + + @Override + public int getInt(int rowId) + { + return (int) value; + } + + @Override + public long getLong(int rowId) + { + return (long) value; + } + + @Override + public float getFloat(int rowId) + { + return (float) value; + } + + @Override + public double getDouble(int rowId) + { + return (double) value; + } + + @Override + public byte[] getBinary(int rowId) + { + return (byte[]) value; + } + + @Override + public String getString(int rowId) + { + return (String) value; + } + + @Override + public Map getMap(int rowId) + { + return (Map) value; + } + + @Override + public Row getStruct(int rowId) + { + return (Row) value; + } + + @Override + public List getArray(int rowId) + { + return (List) value; + } +} diff --git a/kernel/kernel-default/src/main/java/io/delta/kernel/data/vector/DefaultIntVector.java b/kernel/kernel-default/src/main/java/io/delta/kernel/data/vector/DefaultIntVector.java new file mode 100644 index 0000000000..2dfdfa5584 --- /dev/null +++ b/kernel/kernel-default/src/main/java/io/delta/kernel/data/vector/DefaultIntVector.java @@ -0,0 +1,73 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.data.vector; + +import static io.delta.kernel.DefaultKernelUtils.checkArgument; +import static java.util.Objects.requireNonNull; +import java.util.Optional; + +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.DateType; +import io.delta.kernel.types.IntegerType; + +/** + * {@link io.delta.kernel.data.ColumnVector} implementation for integer type data. + */ +public class DefaultIntVector + extends AbstractColumnVector +{ + private final int[] values; + + /** + * Create an instance of {@link io.delta.kernel.data.ColumnVector} for integer type. + * + * @param size number of elements in the vector. + * @param nullability Optional array of nullability value for each element in the vector. + * All values in the vector are considered non-null when parameter is empty. + * @param values column vector values. + */ + public DefaultIntVector( + DataType dataType, int size, Optional nullability, int[] values) + { + super(size, dataType, nullability); + checkArgument(dataType instanceof IntegerType || dataType instanceof DateType); + this.values = requireNonNull(values, "values is null"); + checkArgument(values.length >= 0, "invalid vector size: %s", values.length); + checkArgument(values.length >= size, + "invalid number of values (%s) for given size (%s)", values.length, size); + if (nullability.isPresent()) { + checkArgument(values.length == nullability.get().length, + "vector element components are not of same size" + + "value array size = %s, nullability array size = %s", + values.length, nullability.get().length + ); + } + } + + /** + * Get the value at given {@code rowId}. The return value is undefined and can be + * anything, if the slot for {@code rowId} is null. + * + * @param rowId + * @return + */ + @Override + public int getInt(int rowId) + { + checkValidRowId(rowId); + return values[rowId]; + } +} diff --git a/kernel/kernel-default/src/main/java/io/delta/kernel/data/vector/DefaultLongVector.java b/kernel/kernel-default/src/main/java/io/delta/kernel/data/vector/DefaultLongVector.java new file mode 100644 index 0000000000..2f9394c190 --- /dev/null +++ b/kernel/kernel-default/src/main/java/io/delta/kernel/data/vector/DefaultLongVector.java @@ -0,0 +1,69 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.data.vector; + +import static io.delta.kernel.DefaultKernelUtils.checkArgument; +import static java.util.Objects.requireNonNull; +import java.util.Optional; + +import io.delta.kernel.types.LongType; + +/** + * {@link io.delta.kernel.data.ColumnVector} implementation for long type data. + */ +public class DefaultLongVector + extends AbstractColumnVector +{ + private final long[] values; + + /** + * Create an instance of {@link io.delta.kernel.data.ColumnVector} for long type. + * + * @param size number of elements in the vector. + * @param nullability Optional array of nullability value for each element in the vector. + * All values in the vector are considered non-null when parameter is empty. + * @param values column vector values. + */ + public DefaultLongVector(int size, Optional nullability, long[] values) + { + super(size, LongType.INSTANCE, nullability); + this.values = requireNonNull(values, "values is null"); + checkArgument(values.length >= 0, "invalid vector size: %s", values.length); + checkArgument(values.length >= size, + "invalid number of values (%s) for given size (%s)", values.length, size); + if (nullability.isPresent()) { + checkArgument(values.length == nullability.get().length, + "vector element components are not of same size" + + "value array size = %s, nullability array size = %s", + values.length, nullability.get().length + ); + } + } + + /** + * Get the value at given {@code rowId}. The return value is undefined and can be + * anything, if the slot for {@code rowId} is null. + * + * @param rowId + * @return + */ + @Override + public long getLong(int rowId) + { + checkValidRowId(rowId); + return values[rowId]; + } +} diff --git a/kernel/kernel-default/src/test/java/io/delta/kernel/client/TestDefaultExpressionHandler.java b/kernel/kernel-default/src/test/java/io/delta/kernel/client/TestDefaultExpressionHandler.java new file mode 100644 index 0000000000..4a89c70444 --- /dev/null +++ b/kernel/kernel-default/src/test/java/io/delta/kernel/client/TestDefaultExpressionHandler.java @@ -0,0 +1,261 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.client; + +import static io.delta.kernel.DefaultKernelUtils.daysSinceEpoch; +import java.sql.Date; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import org.junit.Test; + +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.data.DefaultColumnarBatch; +import io.delta.kernel.data.vector.DefaultIntVector; +import io.delta.kernel.data.vector.DefaultLongVector; +import io.delta.kernel.expressions.And; +import io.delta.kernel.expressions.Column; +import io.delta.kernel.expressions.EqualTo; +import io.delta.kernel.expressions.Expression; +import io.delta.kernel.expressions.Literal; +import io.delta.kernel.types.BinaryType; +import io.delta.kernel.types.BooleanType; +import io.delta.kernel.types.ByteType; +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.DateType; +import io.delta.kernel.types.DoubleType; +import io.delta.kernel.types.FloatType; +import io.delta.kernel.types.IntegerType; +import io.delta.kernel.types.LongType; +import io.delta.kernel.types.ShortType; +import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructType; +import io.delta.kernel.types.TimestampType; + +public class TestDefaultExpressionHandler +{ + /** + * Evaluate literal expressions. This is used to populate the partition column vectors. + */ + @Test + public void evalLiterals() + { + StructType inputSchema = new StructType(); + ColumnVector[] data = new ColumnVector[0]; + + List testCases = new ArrayList<>(); + testCases.add(Literal.of(true)); + testCases.add(Literal.of(false)); + testCases.add(Literal.ofNull(BooleanType.INSTANCE)); + testCases.add(Literal.of((byte) 24)); + testCases.add(Literal.ofNull(ByteType.INSTANCE)); + testCases.add(Literal.of((short) 876)); + testCases.add(Literal.ofNull(ShortType.INSTANCE)); + testCases.add(Literal.of(2342342)); + testCases.add(Literal.ofNull(IntegerType.INSTANCE)); + testCases.add(Literal.of(234234223L)); + testCases.add(Literal.ofNull(LongType.INSTANCE)); + testCases.add(Literal.of(23423.4223f)); + testCases.add(Literal.ofNull(FloatType.INSTANCE)); + testCases.add(Literal.of(23423.422233d)); + testCases.add(Literal.ofNull(DoubleType.INSTANCE)); + testCases.add(Literal.of("string_val")); + testCases.add(Literal.ofNull(StringType.INSTANCE)); + testCases.add(Literal.of("binary_val".getBytes())); + testCases.add(Literal.ofNull(BinaryType.INSTANCE)); + testCases.add(Literal.of(new Date(234234234))); + testCases.add(Literal.ofNull(DateType.INSTANCE)); + testCases.add(Literal.of(new Timestamp(2342342342232L))); + testCases.add(Literal.ofNull(TimestampType.INSTANCE)); + + ColumnarBatch[] inputBatches = new ColumnarBatch[] { + new DefaultColumnarBatch(0, inputSchema, data), + new DefaultColumnarBatch(25, inputSchema, data), + new DefaultColumnarBatch(128, inputSchema, data) + }; + + for (Literal expression : testCases) { + DataType outputDataType = expression.dataType(); + + for (ColumnarBatch inputBatch : inputBatches) { + ColumnVector outputVector = eval(inputSchema, inputBatch, expression); + assertEquals(inputBatch.getSize(), outputVector.getSize()); + assertEquals(outputDataType, outputVector.getDataType()); + for (int rowId = 0; rowId < outputVector.getSize(); rowId++) { + if (expression.value() == null) { + assertTrue(outputVector.isNullAt(rowId)); + continue; + } + Object expRowValue = expression.value(); + if (outputDataType instanceof BooleanType) { + assertEquals(expRowValue, outputVector.getBoolean(rowId)); + } + else if (outputDataType instanceof ByteType) { + assertEquals(expRowValue, outputVector.getByte(rowId)); + } + else if (outputDataType instanceof ShortType) { + assertEquals(expRowValue, outputVector.getShort(rowId)); + } + else if (outputDataType instanceof IntegerType) { + assertEquals(expRowValue, outputVector.getInt(rowId)); + } + else if (outputDataType instanceof LongType) { + assertEquals(expRowValue, outputVector.getLong(rowId)); + } + else if (outputDataType instanceof FloatType) { + assertEquals(expRowValue, outputVector.getFloat(rowId)); + } + else if (outputDataType instanceof DoubleType) { + assertEquals(expRowValue, outputVector.getDouble(rowId)); + } + else if (outputDataType instanceof StringType) { + assertEquals(expRowValue, outputVector.getString(rowId)); + } + else if (outputDataType instanceof BinaryType) { + assertEquals(expRowValue, outputVector.getBinary(rowId)); + } + else if (outputDataType instanceof DateType) { + assertEquals( + daysSinceEpoch((Date) expRowValue), outputVector.getInt(rowId)); + } + else if (outputDataType instanceof TimestampType) { + Timestamp timestamp = (Timestamp) expRowValue; + long micros = timestamp.getTime() * 1000; + assertEquals(micros, outputVector.getLong(rowId)); + } + else { + throw new UnsupportedOperationException( + "unsupported output type encountered: " + outputDataType); + } + } + } + } + } + + @Test + public void evalBooleanExpressionSimple() + { + Expression expression = new EqualTo( + new Column(0, "intType", IntegerType.INSTANCE), + Literal.of(3)); + + for (int size : Arrays.asList(26, 234, 567)) { + StructType inputSchema = new StructType() + .add("intType", IntegerType.INSTANCE); + ColumnVector[] data = new ColumnVector[] { + intVector(size) + }; + + ColumnarBatch inputBatch = new DefaultColumnarBatch(size, inputSchema, data); + + ColumnVector output = eval(inputSchema, inputBatch, expression); + for (int rowId = 0; rowId < size; rowId++) { + if (data[0].isNullAt(rowId)) { + // expect the output to be null as well + assertTrue(output.isNullAt(rowId)); + } + else { + assertFalse(output.isNullAt(rowId)); + boolean expValue = rowId % 7 == 3; + assertEquals(expValue, output.getBoolean(rowId)); + } + } + } + } + + @Test + public void evalBooleanExpressionComplex() + { + Expression expression = new And( + new EqualTo(new Column(0, "intType", IntegerType.INSTANCE), Literal.of(3)), + new EqualTo(new Column(1, "longType", LongType.INSTANCE), Literal.of(4L)) + ); + + for (int size : Arrays.asList(26, 234, 567)) { + StructType inputSchema = new StructType() + .add("intType", IntegerType.INSTANCE) + .add("longType", LongType.INSTANCE); + ColumnVector[] data = new ColumnVector[] { + intVector(size), + longVector(size), + }; + + ColumnarBatch inputBatch = new DefaultColumnarBatch(size, inputSchema, data); + + ColumnVector output = eval(inputSchema, inputBatch, expression); + for (int rowId = 0; rowId < size; rowId++) { + if (data[0].isNullAt(rowId) || data[1].isNullAt(rowId)) { + // expect the output to be null as well + assertTrue(output.isNullAt(rowId)); + } + else { + assertFalse(output.isNullAt(rowId)); + boolean expValue = (rowId % 7 == 3) && (rowId * 200L / 87 == 4); + assertEquals(expValue, output.getBoolean(rowId)); + } + } + } + } + + private static ColumnVector eval( + StructType inputSchema, ColumnarBatch input, Expression expression) + { + return new DefaultExpressionHandler() + .getEvaluator(inputSchema, expression) + .eval(input); + } + + private static ColumnVector intVector(int size) + { + int[] values = new int[size]; + boolean[] nullability = new boolean[size]; + + for (int rowId = 0; rowId < size; rowId++) { + if (rowId % 5 == 0) { + nullability[rowId] = true; + } + else { + values[rowId] = rowId % 7; + } + } + + return new DefaultIntVector( + IntegerType.INSTANCE, size, Optional.of(nullability), values); + } + + private static ColumnVector longVector(int size) + { + long[] values = new long[size]; + boolean[] nullability = new boolean[size]; + + for (int rowId = 0; rowId < size; rowId++) { + if (rowId % 5 == 0) { + nullability[rowId] = true; + } + else { + values[rowId] = rowId * 200L % 87; + } + } + + return new DefaultLongVector(size, Optional.of(nullability), values); + } +} diff --git a/kernel/kernel-default/src/test/java/io/delta/kernel/client/TestDefaultFileSystemClient.java b/kernel/kernel-default/src/test/java/io/delta/kernel/client/TestDefaultFileSystemClient.java new file mode 100644 index 0000000000..4443815d71 --- /dev/null +++ b/kernel/kernel-default/src/test/java/io/delta/kernel/client/TestDefaultFileSystemClient.java @@ -0,0 +1,55 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.client; + +import static io.delta.kernel.utils.DefaultKernelTestUtils.getTestResourceFilePath; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import static org.junit.Assert.assertEquals; +import org.junit.Test; + +import io.delta.kernel.fs.FileStatus; +import io.delta.kernel.utils.CloseableIterator; + +public class TestDefaultFileSystemClient +{ + @Test + public void listFrom() throws Exception + { + String basePath = getTestResourceFilePath("json-files"); + String listFrom = getTestResourceFilePath("json-files/2.json"); + + List actListOutput = new ArrayList<>(); + try (CloseableIterator files = fsClient().listFrom(listFrom)) { + while (files.hasNext()) { + actListOutput.add(files.next().getPath()); + } + } + + List expListOutput = Arrays.asList( + "file:" + basePath + "/2.json", + "file:" + basePath + "/3.json"); + + assertEquals(expListOutput, actListOutput); + } + + private static DefaultFileSystemClient fsClient() + { + return new DefaultFileSystemClient(new Configuration()); + } +} diff --git a/kernel/kernel-default/src/test/java/io/delta/kernel/client/TestDefaultJsonHandler.java b/kernel/kernel-default/src/test/java/io/delta/kernel/client/TestDefaultJsonHandler.java new file mode 100644 index 0000000000..405d660b44 --- /dev/null +++ b/kernel/kernel-default/src/test/java/io/delta/kernel/client/TestDefaultJsonHandler.java @@ -0,0 +1,181 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.client; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import static io.delta.kernel.utils.DefaultKernelTestUtils.getTestResourceFilePath; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import static org.junit.Assert.assertEquals; +import org.junit.Test; + +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.data.DefaultJsonRow; +import io.delta.kernel.data.FileDataReadResult; +import io.delta.kernel.data.Row; +import io.delta.kernel.expressions.Literal; +import io.delta.kernel.fs.FileStatus; +import io.delta.kernel.types.BooleanType; +import io.delta.kernel.types.LongType; +import io.delta.kernel.types.MapType; +import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructType; +import io.delta.kernel.utils.CloseableIterator; +import io.delta.kernel.utils.Utils; + +public class TestDefaultJsonHandler +{ + private static final JsonHandler JSON_HANDLER = new DefaultJsonHandler(new Configuration()); + private static final FileSystemClient FS_CLIENT = + new DefaultFileSystemClient(new Configuration()); + + @Test + public void contextualizeFiles() + throws Exception + { + try (CloseableIterator inputScanFiles = testFiles(); + CloseableIterator fileReadContexts = + JSON_HANDLER.contextualizeFileReads(testFiles(), Literal.TRUE)) { + assertEquals(inputScanFiles.hasNext(), fileReadContexts.hasNext()); + if (inputScanFiles.hasNext()) { + Row inputScanFile = inputScanFiles.next(); + FileReadContext outputScanContext = fileReadContexts.next(); + compareScanFileRows(inputScanFile, outputScanContext.getScanFileRow()); + } + } + } + + @Test + public void readJsonFiles() + throws Exception + { + CloseableIterator data = + JSON_HANDLER.readJsonFiles( + JSON_HANDLER.contextualizeFileReads(testFiles(), Literal.TRUE), + new StructType() + .add("path", StringType.INSTANCE) + .add("size", LongType.INSTANCE) + .add("dataChange", BooleanType.INSTANCE) + ); + + List actPaths = new ArrayList<>(); + List actSizes = new ArrayList<>(); + List actDataChanges = new ArrayList<>(); + while (data.hasNext()) { + ColumnarBatch dataBatch = data.next().getData(); + CloseableIterator dataBatchRows = dataBatch.getRows(); + while (dataBatchRows.hasNext()) { + Row row = dataBatchRows.next(); + actPaths.add(row.getString(0)); + actSizes.add(row.getLong(1)); + actDataChanges.add(row.getBoolean(2)); + } + } + + List expPaths = Arrays.asList( + "part-00000-d83dafd8-c344-49f0-ab1c-acd944e32493-c000.snappy.parquet", + "part-00000-cb078bc1-0aeb-46ed-9cf8-74a843b32c8c-c000.snappy.parquet", + "part-00001-9bf4b8f8-1b95-411b-bf10-28dc03aa9d2f-c000.snappy.parquet", + "part-00000-0441e99a-c421-400e-83a1-212aa6c84c73-c000.snappy.parquet", + "part-00001-34c8c673-3f44-4fa7-b94e-07357ec28a7d-c000.snappy.parquet", + "part-00000-842017c2-3e02-44b5-a3d6-5b9ae1745045-c000.snappy.parquet", + "part-00001-e62ca5a1-923c-4ee6-998b-c61d1cfb0b1c-c000.snappy.parquet" + ); + List expSizes = Arrays.asList(348L, 687L, 705L, 650L, 650L, 649L, 649L); + List expDataChanges = Arrays.asList(true, true, true, true, true, true, true); + + assertEquals(expPaths, actPaths); + assertEquals(expSizes, actSizes); + assertEquals(actDataChanges, expDataChanges); + } + + @Test + public void parseJsonContent() + throws Exception + { + String input = + "{" + + " \"path\":\"part-00000-d83dafd8-c344-49f0-ab1c-acd944e32493-c000.snappy.parquet\", " + + " \"partitionValues\":{\"p1\" : \"0\", \"p2\" : \"str\"}," + + " \"size\":348," + + " \"modificationTime\":1603723974000, " + + " \"dataChange\":true" + + " }"; + StructType readSchema = new StructType() + .add("path", StringType.INSTANCE) + .add("partitionValues", + new MapType(StringType.INSTANCE, StringType.INSTANCE, false)) + .add("size", LongType.INSTANCE) + .add("dataChange", BooleanType.INSTANCE); + + ColumnarBatch batch = + JSON_HANDLER.parseJson(Utils.singletonColumnVector(input), readSchema); + assertEquals(1, batch.getSize()); + + try (CloseableIterator rows = batch.getRows()) { + Row row = rows.next(); + assertEquals( + "part-00000-d83dafd8-c344-49f0-ab1c-acd944e32493-c000.snappy.parquet", + row.getString(0) + ); + + Map expPartitionValues = new HashMap() {{ + put("p1", "0"); + put("p2", "str"); + }}; + assertEquals(expPartitionValues, row.getMap(1)); + assertEquals(348L, row.getLong(2)); + assertEquals(true, row.getBoolean(3)); + } + } + + private static CloseableIterator testFiles() + throws Exception + { + String listFrom = getTestResourceFilePath("json-files/1.json"); + CloseableIterator list = FS_CLIENT.listFrom(listFrom); + return list.map(fileStatus -> + new DefaultJsonRow( + addFileJsonFromPath(fileStatus.getPath()), + new StructType() + .add("path", StringType.INSTANCE) + .add("dataChange", BooleanType.INSTANCE) + .add("size", LongType.INSTANCE) + ) + ); + } + + private static final ObjectNode addFileJsonFromPath(String path) + { + ObjectMapper objectMapper = new ObjectMapper(); + ObjectNode object = objectMapper.createObjectNode(); + object.put("path", path); + object.put("dataChange", true); + object.put("size", 234L); + return object; + } + + private static void compareScanFileRows(Row expected, Row actual) + { + // basically compare the paths + assertEquals(expected.getString(0), actual.getString(0)); + } +} diff --git a/kernel/kernel-default/src/test/java/io/delta/kernel/utils/DefaultKernelTestUtils.java b/kernel/kernel-default/src/test/java/io/delta/kernel/utils/DefaultKernelTestUtils.java new file mode 100644 index 0000000000..83cf32b014 --- /dev/null +++ b/kernel/kernel-default/src/test/java/io/delta/kernel/utils/DefaultKernelTestUtils.java @@ -0,0 +1,25 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.utils; + +public class DefaultKernelTestUtils +{ + private DefaultKernelTestUtils() {} + + public static String getTestResourceFilePath(String resourcePath) { + return DefaultKernelTestUtils.class.getClassLoader().getResource(resourcePath).getFile(); + } +} \ No newline at end of file diff --git a/kernel/kernel-default/src/test/resources/json-files/1.json b/kernel/kernel-default/src/test/resources/json-files/1.json new file mode 100644 index 0000000000..04d5431a71 --- /dev/null +++ b/kernel/kernel-default/src/test/resources/json-files/1.json @@ -0,0 +1,3 @@ +{"path":"part-00000-d83dafd8-c344-49f0-ab1c-acd944e32493-c000.snappy.parquet","partitionValues":{},"size":348,"modificationTime":1603723974000,"dataChange":true} +{"path":"part-00000-cb078bc1-0aeb-46ed-9cf8-74a843b32c8c-c000.snappy.parquet","partitionValues":{},"size":687,"modificationTime":1603723972000,"dataChange":true} +{"path":"part-00001-9bf4b8f8-1b95-411b-bf10-28dc03aa9d2f-c000.snappy.parquet","partitionValues":{},"size":705,"modificationTime":1603723972000,"dataChange":true} \ No newline at end of file diff --git a/kernel/kernel-default/src/test/resources/json-files/2.json b/kernel/kernel-default/src/test/resources/json-files/2.json new file mode 100644 index 0000000000..d2af04dc17 --- /dev/null +++ b/kernel/kernel-default/src/test/resources/json-files/2.json @@ -0,0 +1,2 @@ +{"path":"part-00000-0441e99a-c421-400e-83a1-212aa6c84c73-c000.snappy.parquet","partitionValues":{},"size":650,"modificationTime":1603723967000,"dataChange":true} +{"path":"part-00001-34c8c673-3f44-4fa7-b94e-07357ec28a7d-c000.snappy.parquet","partitionValues":{},"size":650,"modificationTime":1603723967000,"dataChange":true} \ No newline at end of file diff --git a/kernel/kernel-default/src/test/resources/json-files/3.json b/kernel/kernel-default/src/test/resources/json-files/3.json new file mode 100644 index 0000000000..f7a916d083 --- /dev/null +++ b/kernel/kernel-default/src/test/resources/json-files/3.json @@ -0,0 +1,2 @@ +{"path":"part-00000-842017c2-3e02-44b5-a3d6-5b9ae1745045-c000.snappy.parquet","partitionValues":{},"size":649,"modificationTime":1603723970000,"dataChange":true} +{"path":"part-00001-e62ca5a1-923c-4ee6-998b-c61d1cfb0b1c-c000.snappy.parquet","partitionValues":{},"size":649,"modificationTime":1603723970000,"dataChange":true} \ No newline at end of file