Skip to content

Commit

Permalink
[Kernel] Default Parquet reader implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
vkorukanti committed Jun 21, 2023
1 parent cb89436 commit ade45e0
Show file tree
Hide file tree
Showing 26 changed files with 3,307 additions and 6 deletions.
6 changes: 1 addition & 5 deletions kernel/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,13 @@ lazy val kernelDefault = (project in file("kernel-default"))
scalaStyleSettings,
releaseSettings,
libraryDependencies ++= Seq(
"org.apache.hadoop" % "hadoop-client-api" % hadoopVersion, // Configuration, Path
"org.apache.hadoop" % "hadoop-client-runtime" % hadoopVersion, // Configuration, Path
"io.delta" % "delta-storage" % deltaStorageVersion, // LogStore
"com.fasterxml.jackson.core" % "jackson-databind" % "2.13.5", // ObjectMapper
"org.apache.parquet" % "parquet-hadoop" % "1.12.3",

"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"io.delta" %% "delta-core" % deltaSparkVersion % "test",
"org.apache.spark" %% "spark-sql" % sparkVersion % "test", // SparkSession
"org.apache.spark" %% "spark-sql" % sparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % sparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-catalyst" % sparkVersion % "test" classifier "tests",
"junit" % "junit" % "4.11" % "test",
"com.novocode" % "junit-interface" % "0.11" % "test"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
import java.util.function.Consumer;
import java.util.function.Function;

public interface CloseableIterator<T> extends Iterator<T>, Closeable {
public interface CloseableIterator<T> extends Iterator<T>, Closeable
{
default <U> CloseableIterator<U> map(Function<T, U> mapper) {
CloseableIterator<T> delegate = this;
return new CloseableIterator<U>() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel;

import java.util.ArrayList;
import java.util.List;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;

import io.delta.kernel.types.DataType;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;

public class DefaultKernelUtils
{
private DefaultKernelUtils() {}

/**
* Given the file schema in Parquet file and selected columns by Delta, return
* a subschema of the file schema.
*
* @param fileSchema
* @param deltaType
* @return
*/
public static final MessageType pruneSchema(
MessageType fileSchema, // parquet
StructType deltaType) // delta-core
{
// TODO: Handle the case where the column is not in Parquet file
return deltaType.fields().stream()
.map(column -> {
Type type = findStructField(fileSchema, column);
if (type == null) {
return null;
}
Type prunedSubfields = pruneSubfields(type, column.getDataType());
return new MessageType(column.getName(), prunedSubfields);
})
.filter(type -> type != null)
.reduce(MessageType::union)
.get();
}

private static Type findStructField(MessageType fileSchema, StructField column)
{
// TODO: Need a way to search by id once we start supporting column mapping `id` mode.
final String columnName = column.getName();
if (fileSchema.containsField(columnName)) {
return fileSchema.getType(columnName);
}
// Parquet is case-sensitive, but hive is not. all hive columns get converted to lowercase
// check for direct match above but if no match found, try case-insensitive match
for (org.apache.parquet.schema.Type type : fileSchema.getFields()) {
if (type.getName().equalsIgnoreCase(columnName)) {
return type;
}
}

return null;
}

private static Type pruneSubfields(Type type, DataType deltaDatatype)
{
if (!(deltaDatatype instanceof StructType)) {
// there is no pruning for non-struct types
return type;
}

GroupType groupType = (GroupType) type;
StructType deltaStructType = (StructType) deltaDatatype;
List<Type> newParquetSubFields = new ArrayList<>();
for (StructField subField : deltaStructType.fields()) {
String subFieldName = subField.getName();
Type parquetSubFieldType = groupType.getType(subFieldName);
if (parquetSubFieldType == null) {
for (org.apache.parquet.schema.Type typeTemp : groupType.getFields()) {
if (typeTemp.getName().equalsIgnoreCase(subFieldName)) {
parquetSubFieldType = type;
}
}
}
newParquetSubFields.add(parquetSubFieldType);
}
return groupType.withNewFields(newParquetSubFields);
}

/**
* Precondition-style validation that throws {@link IllegalArgumentException}.
*
* @param isValid {@code true} if valid, {@code false} if an exception should be thrown
* @throws IllegalArgumentException if {@code isValid} is false
*/
public static void checkArgument(boolean isValid)
throws IllegalArgumentException
{
if (!isValid) {
throw new IllegalArgumentException();
}
}

/**
* Precondition-style validation that throws {@link IllegalArgumentException}.
*
* @param isValid {@code true} if valid, {@code false} if an exception should be thrown
* @param message A String message for the exception.
* @throws IllegalArgumentException if {@code isValid} is false
*/
public static void checkArgument(boolean isValid, String message)
throws IllegalArgumentException
{
if (!isValid) {
throw new IllegalArgumentException(message);
}
}

/**
* Precondition-style validation that throws {@link IllegalArgumentException}.
*
* @param isValid {@code true} if valid, {@code false} if an exception should be thrown
* @param message A String message for the exception.
* @param args Objects used to fill in {@code %s} placeholders in the message
* @throws IllegalArgumentException if {@code isValid} is false
*/
public static void checkArgument(boolean isValid, String message, Object... args)
throws IllegalArgumentException
{
if (!isValid) {
throw new IllegalArgumentException(
String.format(String.valueOf(message), args));
}
}

/**
* Precondition-style validation that throws {@link IllegalStateException}.
*
* @param isValid {@code true} if valid, {@code false} if an exception should be thrown
* @param message A String message for the exception.
* @throws IllegalStateException if {@code isValid} is false
*/
public static void checkState(boolean isValid, String message)
throws IllegalStateException
{
if (!isValid) {
throw new IllegalStateException(message);
}
}

/**
* Search for the Parquet type for in the {@code groupType} for the field equilant to
* {@code field}.
*
* @param groupType Parquet group type coming from the file schema.
* @param field Sub field given as Delta Kernel's {@link StructField}
* @return {@link Type} of the Parquet field. Returns {@code null}, if not found.
*/
public static Type findFieldType(GroupType groupType, StructField field)
{
// TODO: Need a way to search by id once we start supporting column mapping `id` mode.
final String columnName = field.getName();
if (groupType.containsField(columnName)) {
return groupType.getType(columnName);
}
// Parquet is case-sensitive, but hive is not. all hive columns get converted to lowercase
// check for direct match above but if no match found, try case-insensitive match
for (org.apache.parquet.schema.Type type : groupType.getFields()) {
if (type.getName().equalsIgnoreCase(columnName)) {
return type;
}
}

return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.client;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;

import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.FileDataReadResult;
import io.delta.kernel.data.Row;
import io.delta.kernel.expressions.Expression;
import io.delta.kernel.fs.FileStatus;
import io.delta.kernel.parquet.ParquetBatchReader;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.Utils;

public class DefaultParquetHandler
implements ParquetHandler
{
private final Configuration hadoopConf;

public DefaultParquetHandler(Configuration hadoopConf)
{
this.hadoopConf = hadoopConf;
}

@Override
public CloseableIterator<FileReadContext> contextualizeFileReads(
CloseableIterator<Row> fileIter,
Expression predicate)
{
return new CloseableIterator<FileReadContext>()
{
@Override
public void close()
throws IOException
{
fileIter.close();
}

@Override
public boolean hasNext()
{
return fileIter.hasNext();
}

@Override
public FileReadContext next()
{
return () -> fileIter.next();
}
};
}

@Override
public CloseableIterator<FileDataReadResult> readParquetFiles(
CloseableIterator<FileReadContext> fileIter,
StructType physicalSchema) throws IOException
{
return new CloseableIterator<FileDataReadResult>()
{
private FileReadContext currentFile;
private CloseableIterator<ColumnarBatch> currentFileReader;

@Override
public void close()
throws IOException
{
if (currentFileReader != null) {
currentFileReader.close();
}

fileIter.close();
// TODO: implement safe close of multiple closeables.
}

@Override
public boolean hasNext()
{
// There is no file in reading or the current file being read has no more data
// initialize the next file reader or return false if there are no more files to
// read.
if (currentFileReader == null || !currentFileReader.hasNext()) {
if (fileIter.hasNext()) {
currentFile = fileIter.next();
FileStatus fileStatus = Utils.getFileStatus(currentFile.getScanFileRow());
ParquetBatchReader batchReader = new ParquetBatchReader(hadoopConf);
currentFileReader = batchReader.read(fileStatus.getPath(), physicalSchema);
}
else {
return false;
}
}

return currentFileReader.hasNext();
}

@Override
public FileDataReadResult next()
{
final ColumnarBatch data = currentFileReader.next();
return new FileDataReadResult()
{
@Override
public ColumnarBatch getData()
{
return data;
}

@Override
public Row getScanFileRow()
{
return currentFile.getScanFileRow();
}
};
}
};
}
}
Loading

0 comments on commit ade45e0

Please sign in to comment.