Skip to content

Commit

Permalink
checkstyle
Browse files Browse the repository at this point in the history
  • Loading branch information
vkorukanti committed Aug 8, 2023
1 parent c04d2df commit 8a1bdc9
Show file tree
Hide file tree
Showing 37 changed files with 1,437 additions and 250 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,40 +48,12 @@
import static io.delta.kernel.defaults.internal.DefaultKernelUtils.daysSinceEpoch;

public class DefaultExpressionHandler
implements ExpressionHandler
{
implements ExpressionHandler {
@Override
public ExpressionEvaluator getEvaluator(StructType batchSchema, Expression expression) {
return new DefaultExpressionEvaluator(expression);
}

private static class DefaultExpressionEvaluator
implements ExpressionEvaluator {
private final Expression expression;

private DefaultExpressionEvaluator(Expression expression) {
this.expression = expression;
}

@Override
public ColumnVector eval(ColumnarBatch input) {
if (expression instanceof Literal) {
return evalLiteralExpression(input, (Literal) expression);
}

if (expression.dataType().equals(BooleanType.INSTANCE)) {
return evalBooleanOutputExpression(input, expression);
}
// TODO: Boolean output type expressions are good enough for first preview release
// which enables partition pruning and file skipping using file stats.

throw new UnsupportedOperationException("not yet implemented");
}

@Override
public void close() { /* nothing to close */ }
}

private static ColumnVector evalBooleanOutputExpression(
ColumnarBatch input, Expression expression) {
checkArgument(expression.dataType().equals(BooleanType.INSTANCE),
Expand Down Expand Up @@ -133,4 +105,31 @@ private static ColumnVector evalLiteralExpression(ColumnarBatch input, Literal l
throw new UnsupportedOperationException(
"unsupported expression encountered: " + literal);
}

private static class DefaultExpressionEvaluator
implements ExpressionEvaluator {
private final Expression expression;

private DefaultExpressionEvaluator(Expression expression) {
this.expression = expression;
}

@Override
public ColumnVector eval(ColumnarBatch input) {
if (expression instanceof Literal) {
return evalLiteralExpression(input, (Literal) expression);
}

if (expression.dataType().equals(BooleanType.INSTANCE)) {
return evalBooleanOutputExpression(input, expression);
}
// TODO: Boolean output type expressions are good enough for first preview release
// which enables partition pruning and file skipping using file stats.

throw new UnsupportedOperationException("not yet implemented");
}

@Override
public void close() { /* nothing to close */ }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
import io.delta.kernel.data.Row;

public class DefaultFileReadContext
implements FileReadContext
{
implements FileReadContext {
private final Row scanFileRow;

public DefaultFileReadContext(Row scanFileRow) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@
import io.delta.kernel.utils.Utils;

public class DefaultFileSystemClient
implements FileSystemClient
{
implements FileSystemClient {
private final Configuration hadoopConf;

public DefaultFileSystemClient(Configuration hadoopConf) {
Expand Down Expand Up @@ -72,6 +71,12 @@ public CloseableIterator<FileStatus> listFrom(String filePath) {
}
}

@Override
public CloseableIterator<ByteArrayInputStream> readFiles(
CloseableIterator<Tuple2<String, Tuple2<Integer, Integer>>> iter) {
return iter.map(elem -> getStream(elem._1, elem._2._1, elem._2._2));
}

private ByteArrayInputStream getStream(String filePath, Integer offset, Integer size) {
Path path = new Path(filePath);
try {
Expand All @@ -91,10 +96,4 @@ private ByteArrayInputStream getStream(String filePath, Integer offset, Integer
"Could not resolve the FileSystem for path %s", filePath), ex);
}
}

@Override
public CloseableIterator<ByteArrayInputStream> readFiles(
CloseableIterator<Tuple2<String, Tuple2<Integer, Integer>>> iter) {
return iter.map(elem -> getStream(elem._1, elem._2._1, elem._2._2));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@

public class DefaultJsonHandler
extends DefaultFileHandler
implements JsonHandler
{
implements JsonHandler {
private final ObjectMapper objectMapper;
private final Configuration hadoopConf;
private final int maxBatchSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@

import io.delta.kernel.defaults.internal.parquet.ParquetBatchReader;

public class DefaultParquetHandler extends DefaultFileHandler implements ParquetHandler
{
public class DefaultParquetHandler extends DefaultFileHandler implements ParquetHandler {
private final Configuration hadoopConf;

public DefaultParquetHandler(Configuration hadoopConf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@
import io.delta.kernel.client.TableClient;

public class DefaultTableClient
implements TableClient
{
implements TableClient {
private final Configuration hadoopConf;

private DefaultTableClient(Configuration hadoopConf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,31 +49,6 @@ public static final MessageType pruneSchema(
return new MessageType("fileSchema", pruneFields(fileSchema, deltaType));
}

private static List<Type> pruneFields(GroupType type, StructType deltaDataType) {
// prune fields including nested pruning like in pruneSchema
return deltaDataType.fields().stream()
.map(column -> {
Type subType = findSubFieldType(type, column);
if (subType != null) {
return prunedType(subType, column.getDataType());
} else {
return null;
}
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
}

private static Type prunedType(Type type, DataType deltaType) {
if (type instanceof GroupType && deltaType instanceof StructType) {
GroupType groupType = (GroupType) type;
StructType structType = (StructType) deltaType;
return groupType.withNewFields(pruneFields(groupType, structType));
} else {
return type;
}
}

/**
* Search for the Parquet type in {@code groupType} of subfield which is equivalent to
* given {@code field}.
Expand Down Expand Up @@ -165,4 +140,29 @@ public static int daysSinceEpoch(Date date) {
LocalDate localDate = date.toLocalDate();
return (int) ChronoUnit.DAYS.between(EPOCH, localDate);
}

private static List<Type> pruneFields(GroupType type, StructType deltaDataType) {
// prune fields including nested pruning like in pruneSchema
return deltaDataType.fields().stream()
.map(column -> {
Type subType = findSubFieldType(type, column);
if (subType != null) {
return prunedType(subType, column.getDataType());
} else {
return null;
}
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
}

private static Type prunedType(Type type, DataType deltaType) {
if (type instanceof GroupType && deltaType instanceof StructType) {
GroupType groupType = (GroupType) type;
StructType structType = (StructType) deltaType;
return groupType.withNewFields(pruneFields(groupType, structType));
} else {
return type;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@
import io.delta.kernel.types.StructType;

public class DefaultColumnarBatch
implements ColumnarBatch
{
implements ColumnarBatch {
private final int size;
private final StructType schema;
private final List<ColumnVector> columnVectors;
Expand Down Expand Up @@ -55,7 +54,7 @@ public ColumnVector getColumnVector(int ordinal) {

@Override
public ColumnarBatch withNewColumn(int ordinal, StructField structField,
ColumnVector columnVector) {
ColumnVector columnVector) {
if (ordinal < 0 || ordinal > columnVectors.size()) {
throw new IllegalArgumentException("Invalid ordinal: " + ordinal);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;

public class DefaultJsonRow implements Row
{
public class DefaultJsonRow implements Row {
private final Object[] parsedValues;
private final StructType readSchema;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,21 @@ public static Converter createConverter(
throw new UnsupportedOperationException(typeFromClient + " is not supported");
}

static boolean[] initNullabilityVector(int size) {
boolean[] nullability = new boolean[size];
// Initialize all values as null. As Parquet calls this converter only for non-null
// values, make the corresponding value to false.
Arrays.fill(nullability, true);

return nullability;
}

static void setNullabilityToTrue(boolean[] nullability, int start, int end) {
// Initialize all values as null. As Parquet calls this converter only for non-null
// values, make the corresponding value to false.
Arrays.fill(nullability, start, end, true);
}

public interface BaseConverter {
ColumnVector getDataColumnVector(int batchSize);

Expand Down Expand Up @@ -493,19 +508,4 @@ public boolean moveToNextRow(long fileRowIndex) {
return moveToNextRow();
}
}

static boolean[] initNullabilityVector(int size) {
boolean[] nullability = new boolean[size];
// Initialize all values as null. As Parquet calls this converter only for non-null
// values, make the corresponding value to false.
Arrays.fill(nullability, true);

return nullability;
}

static void setNullabilityToTrue(boolean[] nullability, int start, int end) {
// Initialize all values as null. As Parquet calls this converter only for non-null
// values, make the corresponding value to false.
Arrays.fill(nullability, start, end, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,26 +108,6 @@ public ColumnarBatch getDataAsColumnarBatch(int batchSize) {
return batch;
}

/**
* @return true if all members were null
*/
private boolean moveConvertersToNextRow(Optional<Long> fileRowIndex) {
long memberNullCount = Arrays.stream(converters)
.map(converter -> (ParquetConverters.BaseConverter) converter)
.map(converter -> {
if (fileRowIndex.isPresent() &&
converter instanceof ParquetConverters.FileRowIndexColumnConverter) {
return ((ParquetConverters.FileRowIndexColumnConverter) converter)
.moveToNextRow(fileRowIndex.get());
} else {
return converter.moveToNextRow();
}
})
.filter(result -> result)
.count();
return memberNullCount == converters.length;
}

/**
* @param fileRowIndex the file row index of the row processed
*/
Expand Down Expand Up @@ -166,13 +146,6 @@ public ColumnVector getDataColumnVector(int batchSize) {
return vector;
}

private ColumnVector[] collectMemberVectors(int batchSize) {
return Arrays.stream(converters)
.map(converter -> ((ParquetConverters.BaseConverter) converter).getDataColumnVector(
batchSize))
.toArray(ColumnVector[]::new);
}

@Override
public void resizeIfNeeded() {
if (nullability.length == currentRowIndex) {
Expand All @@ -187,4 +160,31 @@ public void resetWorkingState() {
this.currentRowIndex = 0;
this.nullability = ParquetConverters.initNullabilityVector(this.nullability.length);
}

/**
* @return true if all members were null
*/
private boolean moveConvertersToNextRow(Optional<Long> fileRowIndex) {
long memberNullCount = Arrays.stream(converters)
.map(converter -> (ParquetConverters.BaseConverter) converter)
.map(converter -> {
if (fileRowIndex.isPresent() &&
converter instanceof ParquetConverters.FileRowIndexColumnConverter) {
return ((ParquetConverters.FileRowIndexColumnConverter) converter)
.moveToNextRow(fileRowIndex.get());
} else {
return converter.moveToNextRow();
}
})
.filter(result -> result)
.count();
return memberNullCount == converters.length;
}

private ColumnVector[] collectMemberVectors(int batchSize) {
return Arrays.stream(converters)
.map(converter -> ((ParquetConverters.BaseConverter) converter).getDataColumnVector(
batchSize))
.toArray(ColumnVector[]::new);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void evalLiterals() {
testCases.add(Literal.of(new Timestamp(2342342342232L)));
testCases.add(Literal.ofNull(TimestampType.INSTANCE));

ColumnarBatch[] inputBatches = new ColumnarBatch[]{
ColumnarBatch[] inputBatches = new ColumnarBatch[] {
new DefaultColumnarBatch(0, inputSchema, data),
new DefaultColumnarBatch(25, inputSchema, data),
new DefaultColumnarBatch(128, inputSchema, data)
Expand Down Expand Up @@ -151,7 +151,7 @@ public void evalBooleanExpressionSimple() {
for (int size : Arrays.asList(26, 234, 567)) {
StructType inputSchema = new StructType()
.add("intType", IntegerType.INSTANCE);
ColumnVector[] data = new ColumnVector[]{
ColumnVector[] data = new ColumnVector[] {
intVector(size)
};

Expand Down Expand Up @@ -182,7 +182,7 @@ public void evalBooleanExpressionComplex() {
StructType inputSchema = new StructType()
.add("intType", IntegerType.INSTANCE)
.add("longType", LongType.INSTANCE);
ColumnVector[] data = new ColumnVector[]{
ColumnVector[] data = new ColumnVector[] {
intVector(size),
longVector(size),
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ public class TestDefaultJsonHandler {
public void contextualizeFiles()
throws Exception {
try (CloseableIterator<Row> inputScanFiles = testFiles();
CloseableIterator<FileReadContext> fileReadContexts =
JSON_HANDLER.contextualizeFileReads(testFiles(), Literal.TRUE)) {
CloseableIterator<FileReadContext> fileReadContexts =
JSON_HANDLER.contextualizeFileReads(testFiles(), Literal.TRUE)) {
while (inputScanFiles.hasNext() || fileReadContexts.hasNext()) {
assertEquals(inputScanFiles.hasNext(), fileReadContexts.hasNext());
Row inputScanFile = inputScanFiles.next();
Expand Down
Loading

0 comments on commit 8a1bdc9

Please sign in to comment.