Skip to content

Commit

Permalink
clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
Anonymous committed Aug 1, 2023
1 parent 115abb3 commit c3dce83
Show file tree
Hide file tree
Showing 39 changed files with 357 additions and 242 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,38 @@
* limitations under the License.
*/

package io.delta.kernel.client;
package io.delta.kernel.defaults.client;

import java.sql.Date;
import java.sql.Timestamp;
import java.util.Optional;

import io.delta.kernel.client.ExpressionHandler;
import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.Row;
import io.delta.kernel.data.vector.DefaultBooleanVector;
import io.delta.kernel.data.vector.DefaultConstantVector;
import io.delta.kernel.defaults.internal.data.vector.DefaultBooleanVector;
import io.delta.kernel.defaults.internal.data.vector.DefaultConstantVector;
import io.delta.kernel.expressions.Expression;
import io.delta.kernel.expressions.ExpressionEvaluator;
import io.delta.kernel.expressions.Literal;
import io.delta.kernel.types.*;
import io.delta.kernel.types.BinaryType;
import io.delta.kernel.types.BooleanType;
import io.delta.kernel.types.ByteType;
import io.delta.kernel.types.DataType;
import io.delta.kernel.types.DateType;
import io.delta.kernel.types.DoubleType;
import io.delta.kernel.types.FloatType;
import io.delta.kernel.types.IntegerType;
import io.delta.kernel.types.LongType;
import io.delta.kernel.types.ShortType;
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructType;
import io.delta.kernel.types.TimestampType;
import io.delta.kernel.utils.CloseableIterator;
import static io.delta.kernel.DefaultKernelUtils.checkArgument;
import static io.delta.kernel.DefaultKernelUtils.daysSinceEpoch;

import static io.delta.kernel.defaults.internal.DefaultKernelUtils.checkArgument;
import static io.delta.kernel.defaults.internal.DefaultKernelUtils.daysSinceEpoch;

public class DefaultExpressionHandler
implements ExpressionHandler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.client;

import static java.util.Objects.requireNonNull;
package io.delta.kernel.defaults.client;

import io.delta.kernel.client.FileHandler;
import io.delta.kernel.client.FileReadContext;
import io.delta.kernel.data.Row;
import io.delta.kernel.expressions.Expression;
import io.delta.kernel.utils.CloseableIterator;

import static java.util.Objects.requireNonNull;

/**
* Default client implementation of {@link FileHandler}. It splits file as one split.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.client;

import static java.util.Objects.requireNonNull;
package io.delta.kernel.defaults.client;

import io.delta.kernel.client.FileReadContext;
import io.delta.kernel.data.Row;

import static java.util.Objects.requireNonNull;

public class DefaultFileReadContext
implements FileReadContext
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.client;
package io.delta.kernel.defaults.client;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
Expand All @@ -22,11 +22,11 @@
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import io.delta.kernel.client.FileSystemClient;
import io.delta.kernel.fs.FileStatus;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.Tuple2;
Expand Down Expand Up @@ -74,7 +74,8 @@ public CloseableIterator<FileStatus> listFrom(String filePath)
}
}

private ByteArrayInputStream getStream(String filePath, Integer offset, Integer size) {
private ByteArrayInputStream getStream(String filePath, Integer offset, Integer size)
{
Path path = new Path(filePath);
try {
FileSystem fs = path.getFileSystem(hadoopConf);
Expand All @@ -83,20 +84,23 @@ private ByteArrayInputStream getStream(String filePath, Integer offset, Integer
byte[] buff = new byte[size];
stream.readFully(buff);
return new ByteArrayInputStream(buff);
} catch (IOException ex) {
}
catch (IOException ex) {
throw new RuntimeException(String.format(
"IOException reading from file %s at offset %s size %s",
filePath, offset, size), ex);
"IOException reading from file %s at offset %s size %s",
filePath, offset, size), ex);
}
} catch (IOException ex) {
}
catch (IOException ex) {
throw new RuntimeException(String.format(
"Could not resolve the FileSystem for path %s", filePath), ex);
"Could not resolve the FileSystem for path %s", filePath), ex);
}
}

@Override
public CloseableIterator<ByteArrayInputStream> readFiles(
CloseableIterator<Tuple2<String, Tuple2<Integer, Integer>>> iter) {
CloseableIterator<Tuple2<String, Tuple2<Integer, Integer>>> iter)
{
return iter.map(elem -> getStream(elem._1, elem._2._1, elem._2._2));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,36 +13,38 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.client;
package io.delta.kernel.defaults.client;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import io.delta.kernel.client.FileReadContext;
import io.delta.kernel.client.JsonHandler;
import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.DefaultJsonRow;
import io.delta.kernel.data.DefaultRowBasedColumnarBatch;
import io.delta.kernel.data.FileDataReadResult;
import io.delta.kernel.data.Row;
import io.delta.kernel.defaults.internal.data.DefaultJsonRow;
import io.delta.kernel.defaults.internal.data.DefaultRowBasedColumnarBatch;
import io.delta.kernel.fs.FileStatus;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.Utils;
import static io.delta.kernel.DefaultKernelUtils.checkArgument;

import static io.delta.kernel.defaults.internal.DefaultKernelUtils.checkArgument;

public class DefaultJsonHandler
extends DefaultFileHandler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.client;
package io.delta.kernel.defaults.client;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import io.delta.kernel.client.FileReadContext;
import io.delta.kernel.client.ParquetHandler;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.FileDataReadResult;
import io.delta.kernel.data.Row;
import io.delta.kernel.defaults.internal.parquet.ParquetBatchReader;
import io.delta.kernel.fs.FileStatus;
import io.delta.kernel.parquet.ParquetBatchReader;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.Utils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.client;
package io.delta.kernel.defaults.client;

import org.apache.hadoop.conf.Configuration;

import io.delta.kernel.client.ExpressionHandler;
import io.delta.kernel.client.FileSystemClient;
import io.delta.kernel.client.JsonHandler;
import io.delta.kernel.client.ParquetHandler;
import io.delta.kernel.client.TableClient;

public class DefaultTableClient
implements TableClient
implements TableClient
{
private final Configuration hadoopConf;

Expand Down Expand Up @@ -53,10 +59,12 @@ public ParquetHandler getParquetHandler()

/**
* Create an instance of {@link DefaultTableClient}.
*
* @param hadoopConf Hadoop configuration to use.
* @return
*/
public static DefaultTableClient create(Configuration hadoopConf) {
public static DefaultTableClient create(Configuration hadoopConf)
{
return new DefaultTableClient(hadoopConf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel;
package io.delta.kernel.defaults.internal;

import java.sql.Date;
import java.time.LocalDate;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
Expand Down Expand Up @@ -51,27 +50,31 @@ public static final MessageType pruneSchema(
return new MessageType("fileSchema", pruneFields(fileSchema, deltaType));
}

private static List<Type> pruneFields(GroupType type, StructType deltaDataType) {
private static List<Type> pruneFields(GroupType type, StructType deltaDataType)
{
// prune fields including nested pruning like in pruneSchema
return deltaDataType.fields().stream()
.map(column -> {
Type subType = findSubFieldType(type, column);
if (subType != null) {
return prunedType(subType, column.getDataType());
} else {
return null;
}
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
.map(column -> {
Type subType = findSubFieldType(type, column);
if (subType != null) {
return prunedType(subType, column.getDataType());
}
else {
return null;
}
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
}

private static Type prunedType(Type type, DataType deltaType) {
private static Type prunedType(Type type, DataType deltaType)
{
if (type instanceof GroupType && deltaType instanceof StructType) {
GroupType groupType = (GroupType) type;
StructType structType = (StructType) deltaType;
return groupType.withNewFields(pruneFields(groupType, structType));
} else {
}
else {
return type;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.data;
package io.delta.kernel.defaults.internal.data;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.data;
package io.delta.kernel.defaults.internal.data;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

import io.delta.kernel.data.Row;
import io.delta.kernel.types.ArrayType;
import io.delta.kernel.types.BooleanType;
import io.delta.kernel.types.DataType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.data;
package io.delta.kernel.defaults.internal.data;

import java.util.List;
import java.util.Map;

import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.Row;
import io.delta.kernel.types.DataType;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;
import static io.delta.kernel.DefaultKernelUtils.checkArgument;

import static io.delta.kernel.defaults.internal.DefaultKernelUtils.checkArgument;

/**
* {@link ColumnarBatch} wrapper around list of {@link Row} objects.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.data.vector;
package io.delta.kernel.defaults.internal.data.vector;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import static java.util.Objects.requireNonNull;

import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.data.Row;
import io.delta.kernel.types.DataType;
import static io.delta.kernel.DefaultKernelUtils.checkArgument;

import static io.delta.kernel.defaults.internal.DefaultKernelUtils.checkArgument;
import static java.util.Objects.requireNonNull;

/**
* Abstract implementation of {@link ColumnVector} that provides the default functionality
Expand Down
Loading

0 comments on commit c3dce83

Please sign in to comment.