From dd88aa2ce23a2be8e896ce810954dd2e30901707 Mon Sep 17 00:00:00 2001 From: "rakesh.veeramacheneni" Date: Mon, 8 Jul 2024 10:36:37 -0700 Subject: [PATCH] [Kernel][Writes] Add support for writing data file stats --- build.sbt | 2 +- .../io/delta/kernel/expressions/Column.java | 14 ++ .../kernel/internal/TransactionImpl.java | 30 ++- .../internal/data/TransactionStateRow.java | 4 +- .../internal/skipping/DataSkippingUtils.java | 3 + .../kernel/statistics/DataFileStatistics.java | 215 ++++++++++++++++++ .../kernel/utils/DataFileStatistics.java | 104 --------- .../io/delta/kernel/utils/DataFileStatus.java | 1 + .../java/io/delta/kernel/utils/JsonUtil.java | 66 ++++++ .../io/delta/kernel/TransactionSuite.scala | 8 +- .../util/DataFileStatisticsSuite.scala | 194 ++++++++++++++++ .../internal/parquet/ParquetFileWriter.java | 2 + .../internal/parquet/ParquetStatsReader.java | 13 +- 13 files changed, 541 insertions(+), 115 deletions(-) create mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/statistics/DataFileStatistics.java delete mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/utils/DataFileStatistics.java create mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/utils/JsonUtil.java create mode 100644 kernel/kernel-api/src/test/scala/io/delta/kernel/internal/util/DataFileStatisticsSuite.scala diff --git a/build.sbt b/build.sbt index a9d3548de5..2024b91b9a 100644 --- a/build.sbt +++ b/build.sbt @@ -571,7 +571,7 @@ lazy val kernelApi = (project in file("kernel/kernel-api")) "org.roaringbitmap" % "RoaringBitmap" % "0.9.25", "org.slf4j" % "slf4j-api" % "1.7.36", - "com.fasterxml.jackson.core" % "jackson-databind" % "2.13.5" % "test", + "com.fasterxml.jackson.core" % "jackson-databind" % "2.13.5", "org.scalatest" %% "scalatest" % scalaTestVersion % "test", "junit" % "junit" % "4.13.2" % "test", "com.novocode" % "junit-interface" % "0.11" % "test", diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/expressions/Column.java b/kernel/kernel-api/src/main/java/io/delta/kernel/expressions/Column.java index 397cdc8b48..00a3e2c133 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/expressions/Column.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/expressions/Column.java @@ -61,6 +61,20 @@ public String toString() { return "column(" + quoteColumnPath(names) + ")"; } + /** + * Returns a new column that appends the input column name to the current column. Corresponds to + * an additional level of nested reference. + * + * @param name the column name to append + * @return the new column + */ + public Column append(String name) { + String[] newNames = new String[names.length + 1]; + System.arraycopy(names, 0, newNames, 0, names.length); + newNames[names.length] = name; + return new Column(newNames); + } + private static String quoteColumnPath(String[] names) { return Arrays.stream(names) .map(s -> format("`%s`", s.replace("`", "``"))) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java index f509277183..161a0d2c54 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java @@ -22,16 +22,23 @@ import static io.delta.kernel.internal.util.Preconditions.checkState; import static io.delta.kernel.internal.util.Utils.toCloseableIterator; -import io.delta.kernel.*; +import io.delta.kernel.Meta; +import io.delta.kernel.Operation; +import io.delta.kernel.Transaction; +import io.delta.kernel.TransactionCommitResult; import io.delta.kernel.data.Row; import io.delta.kernel.engine.Engine; import io.delta.kernel.exceptions.ConcurrentWriteException; import io.delta.kernel.expressions.Column; -import io.delta.kernel.internal.actions.*; +import io.delta.kernel.internal.actions.CommitInfo; +import io.delta.kernel.internal.actions.Metadata; +import io.delta.kernel.internal.actions.Protocol; +import io.delta.kernel.internal.actions.SetTransaction; import io.delta.kernel.internal.data.TransactionStateRow; import io.delta.kernel.internal.fs.Path; import io.delta.kernel.internal.replay.ConflictChecker; import io.delta.kernel.internal.replay.ConflictChecker.TransactionRebaseState; +import io.delta.kernel.internal.skipping.DataSkippingUtils; import io.delta.kernel.internal.util.Clock; import io.delta.kernel.internal.util.ColumnMapping; import io.delta.kernel.internal.util.FileNames; @@ -322,6 +329,23 @@ private Map getOperationParameters() { */ public static List getStatisticsColumns(Engine engine, Row transactionState) { // TODO: implement this once we start supporting collecting stats - return Collections.emptyList(); + int numIndexedCols = + Integer.parseInt( + TransactionStateRow.getConfiguration(transactionState) + .getOrDefault( + DataSkippingUtils.DATA_SKIPPING_NUM_INDEXED_COLS, + String.valueOf(DataSkippingUtils.DEFAULT_DATA_SKIPPING_NUM_INDEXED_COLS))); + + // Get the list of partition columns to exclude + Set partitionColumns = + TransactionStateRow.getPartitionColumnsList(transactionState).stream() + .collect(Collectors.toSet()); + + // For now, only support the first numIndexedCols columns + return TransactionStateRow.getLogicalSchema(engine, transactionState).fields().stream() + .filter(p -> !partitionColumns.contains(p.getName())) + .limit(numIndexedCols) + .map(field -> new Column(field.getName())) + .collect(Collectors.toList()); } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/data/TransactionStateRow.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/data/TransactionStateRow.java index 0a5df37965..be21978f5a 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/data/TransactionStateRow.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/data/TransactionStateRow.java @@ -25,7 +25,9 @@ import io.delta.kernel.internal.actions.Metadata; import io.delta.kernel.internal.util.VectorUtils; import io.delta.kernel.types.*; -import java.util.*; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.stream.IntStream; public class TransactionStateRow extends GenericRow { diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/skipping/DataSkippingUtils.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/skipping/DataSkippingUtils.java index d93bb71028..c5655cf5cd 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/skipping/DataSkippingUtils.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/skipping/DataSkippingUtils.java @@ -32,6 +32,9 @@ public class DataSkippingUtils { + public static final String DATA_SKIPPING_NUM_INDEXED_COLS = "delta.dataSkippingNumIndexedCols"; + public static final int DEFAULT_DATA_SKIPPING_NUM_INDEXED_COLS = 32; + /** * Given a {@code FilteredColumnarBatch} of scan files and the statistics schema to parse, return * the parsed JSON stats from the scan files. diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/statistics/DataFileStatistics.java b/kernel/kernel-api/src/main/java/io/delta/kernel/statistics/DataFileStatistics.java new file mode 100644 index 0000000000..c9cdc62b1b --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/statistics/DataFileStatistics.java @@ -0,0 +1,215 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.statistics; + +import static java.time.ZoneOffset.UTC; +import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE; +import static java.time.temporal.ChronoUnit.MILLIS; + +import com.fasterxml.jackson.core.JsonGenerator; +import io.delta.kernel.expressions.Column; +import io.delta.kernel.expressions.Literal; +import io.delta.kernel.types.*; +import io.delta.kernel.utils.JsonUtil; +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.LocalDate; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Collections; +import java.util.Map; + +/** Statistics about data file in a Delta Lake table. */ +public class DataFileStatistics { + private StructType dataSchema; + private final long numRecords; + private final Map minValues; + private final Map maxValues; + private final Map nullCounts; + + public static final int MICROSECONDS_PER_SECOND = 1_000_000; + public static final int NANOSECONDS_PER_MICROSECOND = 1_000; + + private static final DateTimeFormatter TIMESTAMP_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSX"); + + /** + * Create a new instance of {@link DataFileStatistics}. + * + * @param dataSchema Schema of the data file. + * @param numRecords Number of records in the data file. + * @param minValues Map of column to minimum value of it in the data file. If the data file has + * all nulls for the column, the value will be null or not present in the map. + * @param maxValues Map of column to maximum value of it in the data file. If the data file has + * all nulls for the column, the value will be null or not present in the map. + * @param nullCounts Map of column to number of nulls in the data file. + */ + public DataFileStatistics( + StructType dataSchema, + long numRecords, + Map minValues, + Map maxValues, + Map nullCounts) { + this.dataSchema = dataSchema; + this.numRecords = numRecords; + this.minValues = Collections.unmodifiableMap(minValues); + this.maxValues = Collections.unmodifiableMap(maxValues); + this.nullCounts = Collections.unmodifiableMap(nullCounts); + } + + /** + * Get the number of records in the data file. + * + * @return Number of records in the data file. + */ + public long getNumRecords() { + return numRecords; + } + + /** + * Get the minimum values of the columns in the data file. The map may contain statistics for only + * a subset of columns in the data file. + * + * @return Map of column to minimum value of it in the data file. + */ + public Map getMinValues() { + return minValues; + } + + /** + * Get the maximum values of the columns in the data file. The map may contain statistics for only + * a subset of columns in the data file. + * + * @return Map of column to minimum value of it in the data file. + */ + public Map getMaxValues() { + return maxValues; + } + + /** + * Get the number of nulls of columns in the data file. The map may contain statistics for only a + * subset of columns in the data file. + * + * @return Map of column to number of nulls in the data file. + */ + public Map getNullCounts() { + return nullCounts; + } + + public String serializeAsJson() { + return JsonUtil.generate( + gen -> { + gen.writeStartObject(); + gen.writeNumberField("numRecords", numRecords); + + gen.writeObjectFieldStart("minValues"); + // TODO: Prune dataSchema to a statsSchema instead? + writeJsonValues( + gen, + dataSchema, + minValues, + new Column(new String[0]), + (g, v) -> writeJsonValue(g, v)); + gen.writeEndObject(); + + gen.writeObjectFieldStart("maxValues"); + writeJsonValues( + gen, + dataSchema, + maxValues, + new Column(new String[0]), + (g, v) -> writeJsonValue(g, v)); + gen.writeEndObject(); + + gen.writeObjectFieldStart("nullCounts"); + writeJsonValues( + gen, dataSchema, nullCounts, new Column(new String[0]), (g, v) -> g.writeNumber(v)); + gen.writeEndObject(); + + gen.writeEndObject(); + }); + } + + private void writeJsonValues( + JsonGenerator generator, + StructType schema, + Map values, + Column parentColPath, + JsonUtil.JsonValueWriter writer) + throws IOException { + for (StructField field : schema.fields()) { + Column colPath = parentColPath.append(field.getName()); + if (field.getDataType() instanceof StructType) { + generator.writeObjectFieldStart(field.getName()); + writeJsonValues(generator, (StructType) field.getDataType(), values, colPath, writer); + generator.writeEndObject(); + } else { + T value = values.get(colPath); + if (value != null) { + generator.writeFieldName(field.getName()); + writer.write(generator, value); + } + } + } + } + + private void writeJsonValue(JsonGenerator generator, Literal literal) throws IOException { + if (literal == null || literal.getValue() == null) { + return; + } + DataType type = literal.getDataType(); + Object value = literal.getValue(); + if (type instanceof BooleanType) { + generator.writeBoolean((Boolean) value); + } else if (type instanceof ByteType) { + generator.writeNumber(((Number) value).byteValue()); + } else if (type instanceof ShortType) { + generator.writeNumber(((Number) value).shortValue()); + } else if (type instanceof IntegerType) { + generator.writeNumber(((Number) value).intValue()); + } else if (type instanceof LongType) { + generator.writeNumber(((Number) value).longValue()); + } else if (type instanceof FloatType) { + generator.writeNumber(((Number) value).floatValue()); + } else if (type instanceof DoubleType) { + generator.writeNumber(((Number) value).doubleValue()); + } else if (type instanceof StringType) { + generator.writeString((String) value); + } else if (type instanceof BinaryType) { + generator.writeString(new String((byte[]) value, StandardCharsets.UTF_8)); + } else if (type instanceof DecimalType) { + generator.writeNumber((BigDecimal) value); + } else if (type instanceof DateType) { + generator.writeString( + LocalDate.ofEpochDay(((Number) value).longValue()).format(ISO_LOCAL_DATE)); + } else if (type instanceof TimestampType || type instanceof TimestampNTZType) { + long epochMicros = (long) value; + long epochSeconds = epochMicros / MICROSECONDS_PER_SECOND; + int nanoAdjustment = + (int) (epochMicros % MICROSECONDS_PER_SECOND) * NANOSECONDS_PER_MICROSECOND; + if (nanoAdjustment < 0) { + nanoAdjustment += MICROSECONDS_PER_SECOND * NANOSECONDS_PER_MICROSECOND; + } + Instant instant = Instant.ofEpochSecond(epochSeconds, nanoAdjustment); + generator.writeString( + TIMESTAMP_FORMATTER.format(ZonedDateTime.ofInstant(instant.truncatedTo(MILLIS), UTC))); + } else { + throw new IllegalArgumentException("Unsupported stats data type: " + type); + } + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/utils/DataFileStatistics.java b/kernel/kernel-api/src/main/java/io/delta/kernel/utils/DataFileStatistics.java deleted file mode 100644 index 41006deffe..0000000000 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/utils/DataFileStatistics.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Copyright (2023) The Delta Lake Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.delta.kernel.utils; - -import static io.delta.kernel.internal.util.Preconditions.checkArgument; - -import io.delta.kernel.expressions.Column; -import io.delta.kernel.expressions.Literal; -import java.util.Collections; -import java.util.Map; - -/** Statistics about data file in a Delta Lake table. */ -public class DataFileStatistics { - private final long numRecords; - private final Map minValues; - private final Map maxValues; - private final Map nullCounts; - - /** - * Create a new instance of {@link DataFileStatistics}. - * - * @param numRecords Number of records in the data file. - * @param minValues Map of column to minimum value of it in the data file. If the data file has - * all nulls for the column, the value will be null or not present in the map. - * @param maxValues Map of column to maximum value of it in the data file. If the data file has - * all nulls for the column, the value will be null or not present in the map. - * @param nullCounts Map of column to number of nulls in the data file. - */ - public DataFileStatistics( - long numRecords, - Map minValues, - Map maxValues, - Map nullCounts) { - checkArgument(numRecords >= 0, "numRecords should be non-negative"); - this.numRecords = numRecords; - this.minValues = Collections.unmodifiableMap(minValues); - this.maxValues = Collections.unmodifiableMap(maxValues); - this.nullCounts = Collections.unmodifiableMap(nullCounts); - } - - /** - * Get the number of records in the data file. - * - * @return Number of records in the data file. - */ - public long getNumRecords() { - return numRecords; - } - - /** - * Get the minimum values of the columns in the data file. The map may contain statistics for only - * a subset of columns in the data file. - * - * @return Map of column to minimum value of it in the data file. - */ - public Map getMinValues() { - return minValues; - } - - /** - * Get the maximum values of the columns in the data file. The map may contain statistics for only - * a subset of columns in the data file. - * - * @return Map of column to minimum value of it in the data file. - */ - public Map getMaxValues() { - return maxValues; - } - - /** - * Get the number of nulls of columns in the data file. The map may contain statistics for only a - * subset of columns in the data file. - * - * @return Map of column to number of nulls in the data file. - */ - public Map getNullCounts() { - return nullCounts; - } - - @Override - public String toString() { - return serializeAsJson(); - } - - public String serializeAsJson() { - // TODO: implement this. Full statistics serialization will be added as part of - // https://github.com/delta-io/delta/pull/3342. The PR is pending on a decision. - // For now just serialize the number of records. - return "{\"numRecords\":" + numRecords + "}"; - } -} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/utils/DataFileStatus.java b/kernel/kernel-api/src/main/java/io/delta/kernel/utils/DataFileStatus.java index 8ccaced22d..048456fe17 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/utils/DataFileStatus.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/utils/DataFileStatus.java @@ -16,6 +16,7 @@ package io.delta.kernel.utils; +import io.delta.kernel.statistics.DataFileStatistics; import java.util.Optional; /** diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/utils/JsonUtil.java b/kernel/kernel-api/src/main/java/io/delta/kernel/utils/JsonUtil.java new file mode 100644 index 0000000000..132bc59510 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/utils/JsonUtil.java @@ -0,0 +1,66 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.utils; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.io.StringWriter; +import java.io.UncheckedIOException; + +public class JsonUtil { + + private JsonUtil() {} + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final JsonFactory FACTORY = new JsonFactory(); + + public static JsonFactory factory() { + return FACTORY; + } + + public static ObjectMapper mapper() { + return OBJECT_MAPPER; + } + + @FunctionalInterface + public interface ToJson { + void generate(JsonGenerator generator) throws IOException; + } + + @FunctionalInterface + public interface JsonValueWriter { + void write(JsonGenerator generator, T value) throws IOException; + } + + /** + * Utility class for writing JSON with a Jackson {@link JsonGenerator}. + * + * @param toJson function that produces JSON using a {@link JsonGenerator} + * @return a JSON string produced from the generator + */ + public static String generate(ToJson toJson) { + try (StringWriter writer = new StringWriter(); + JsonGenerator generator = JsonUtil.factory().createGenerator(writer)) { + toJson.generate(generator); + generator.flush(); + return writer.toString(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/TransactionSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/TransactionSuite.scala index 0b1fb7eb01..2bdd3e5bdb 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/TransactionSuite.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/TransactionSuite.scala @@ -27,9 +27,10 @@ import io.delta.kernel.internal.data.TransactionStateRow import io.delta.kernel.internal.util.Utils.toCloseableIterator import io.delta.kernel.internal.util.VectorUtils import io.delta.kernel.internal.util.VectorUtils.stringStringMapValue +import io.delta.kernel.statistics.DataFileStatistics import io.delta.kernel.test.{BaseMockJsonHandler, MockEngineUtils, VectorTestUtils} import io.delta.kernel.types.{LongType, StringType, StructType} -import io.delta.kernel.utils.{CloseableIterator, DataFileStatistics, DataFileStatus} +import io.delta.kernel.utils.{CloseableIterator, DataFileStatus} import org.scalatest.funsuite.AnyFunSuite import java.lang.{Long => JLong} @@ -122,7 +123,9 @@ class TransactionSuite extends AnyFunSuite with VectorTestUtils with MockEngineU actStats = actStats :+ add.getString(statsOrdinal) } - assert(actStats === Seq("{\"numRecords\":10}", "{\"numRecords\":20}")) + assert(actStats === Seq( + "{\"numRecords\":10,\"minValues\":{},\"maxValues\":{},\"nullCounts\":{}}", + "{\"numRecords\":20,\"minValues\":{},\"maxValues\":{},\"nullCounts\":{}}")) } } } @@ -219,6 +222,7 @@ object TransactionSuite extends VectorTestUtils with MockEngineUtils { def testStats(numRowsOpt: Option[Long]): Option[DataFileStatistics] = { numRowsOpt.map(numRows => { new DataFileStatistics( + testSchema, numRows, Map.empty[Column, Literal].asJava, // minValues - empty value as this is just for tests. Map.empty[Column, Literal].asJava, // maxValues - empty value as this is just for tests. diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/util/DataFileStatisticsSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/util/DataFileStatisticsSuite.scala new file mode 100644 index 0000000000..28bcb02c79 --- /dev/null +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/util/DataFileStatisticsSuite.scala @@ -0,0 +1,194 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.util + +import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} +import io.delta.kernel.expressions.{Column, Literal} +import io.delta.kernel.statistics.DataFileStatistics +import io.delta.kernel.types._ +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.must.Matchers + +import scala.collection.JavaConverters.mapAsJavaMapConverter + +class DataFileStatisticsSuite extends AnyFunSuite with Matchers { + + val objectMapper = new ObjectMapper() + + def jsonToNode(json: String): JsonNode = { + objectMapper.readTree(json) + } + + def areJsonNodesEqual(json1: String, json2: String): Boolean = { + val node1 = jsonToNode(json1) + val node2 = jsonToNode(json2) + node1 == node2 + } + + test("DataFileStatistics serialization with all types") { + val nestedStructType = new StructType() + .add("aa", StringType.STRING) + .add("ac", new StructType().add("aca", IntegerType.INTEGER)) + + val schema = new StructType() + .add("ByteType", ByteType.BYTE) + .add("ShortType", ShortType.SHORT) + .add("IntegerType", IntegerType.INTEGER) + .add("LongType", LongType.LONG) + .add("FloatType", FloatType.FLOAT) + .add("DoubleType", DoubleType.DOUBLE) + .add("DecimalType", new DecimalType(10, 2)) + .add("StringType", StringType.STRING) + .add("DateType", DateType.DATE) + .add("TimestampType", TimestampType.TIMESTAMP) + .add("TimestampNTZType", TimestampNTZType.TIMESTAMP_NTZ) + .add("BinaryType", BinaryType.BINARY) + .add("NestedStruct", nestedStructType) + + // Define minValues with nested struct + val minValues = Map( + new Column("ByteType") -> Literal.ofByte(1.toByte), + new Column("ShortType") -> Literal.ofShort(1.toShort), + new Column("IntegerType") -> Literal.ofInt(1), + new Column("LongType") -> Literal.ofLong(1L), + new Column("FloatType") -> Literal.ofFloat(0.1f), + new Column("DoubleType") -> Literal.ofDouble(0.1), + new Column("DecimalType") -> Literal.ofDecimal(new java.math.BigDecimal("123.45"), 10, 2), + new Column("StringType") -> Literal.ofString("a"), + new Column("DateType") -> Literal.ofDate(1), + new Column("TimestampType") -> Literal.ofTimestamp(1L), + new Column("TimestampNTZType") -> Literal.ofTimestampNtz(1L), + new Column("BinaryType") -> Literal.ofBinary("a".getBytes), + new Column(Array("NestedStruct", "aa")) -> Literal.ofString("a"), + new Column(Array("NestedStruct", "ac", "aca")) -> Literal.ofInt(1) + ).asJava + + // Define maxValues with nested struct + val maxValues = Map( + new Column("ByteType") -> Literal.ofByte(10.toByte), + new Column("ShortType") -> Literal.ofShort(10.toShort), + new Column("IntegerType") -> Literal.ofInt(10), + new Column("LongType") -> Literal.ofLong(10L), + new Column("FloatType") -> Literal.ofFloat(10.1f), + new Column("DoubleType") -> Literal.ofDouble(10.1), + new Column("DecimalType") -> Literal.ofDecimal(new java.math.BigDecimal("456.78"), 10, 2), + new Column("StringType") -> Literal.ofString("z"), + new Column("DateType") -> Literal.ofDate(10), + new Column("TimestampType") -> Literal.ofTimestamp(10L), + new Column("TimestampNTZType") -> Literal.ofTimestampNtz(10L), + new Column("BinaryType") -> Literal.ofBinary("z".getBytes), + new Column(Array("NestedStruct", "aa")) -> Literal.ofString("z"), + new Column(Array("NestedStruct", "ac", "aca")) -> Literal.ofInt(10) + ).asJava + + // Define nullCounts with nested struct + val nullCounts = Map( + new Column("ByteType") -> 1L, + new Column("ShortType") -> 1L, + new Column("IntegerType") -> 1L, + new Column("LongType") -> 1L, + new Column("FloatType") -> 1L, + new Column("DoubleType") -> 1L, + new Column("DecimalType") -> 1L, + new Column("StringType") -> 1L, + new Column("DateType") -> 1L, + new Column("TimestampType") -> 1L, + new Column("TimestampNTZType") -> 1L, + new Column("BinaryType") -> 1L, + new Column(Array("NestedStruct", "aa")) -> 1L, + new Column(Array("NestedStruct", "ac", "aca")) -> 1L + ) + + val numRecords = 100L + + val stats = new DataFileStatistics( + schema, + 100, + minValues, + maxValues, + nullCounts.map { case (k, v) => (k, java.lang.Long.valueOf(v)) }.asJava + ) + + val expectedJson = + """{ + | "numRecords": 100, + | "minValues": { + | "ByteType": 1, + | "ShortType": 1, + | "IntegerType": 1, + | "LongType": 1, + | "FloatType": 0.1, + | "DoubleType": 0.1, + | "DecimalType": 123.45, + | "StringType": "a", + | "DateType": "1970-01-02", + | "TimestampType": "1970-01-01T00:00:00.000Z", + | "TimestampNTZType": "1970-01-01T00:00:00.000Z", + | "BinaryType": "a", + | "NestedStruct": { + | "aa": "a", + | "ac": { + | "aca": 1 + | } + | } + | }, + | "maxValues": { + | "ByteType": 10, + | "ShortType": 10, + | "IntegerType": 10, + | "LongType": 10, + | "FloatType": 10.1, + | "DoubleType": 10.1, + | "DecimalType": 456.78, + | "StringType": "z", + | "DateType": "1970-01-11", + | "TimestampType": "1970-01-01T00:00:00.000Z", + | "TimestampNTZType": "1970-01-01T00:00:00.000Z", + | "BinaryType": "z", + | "NestedStruct": { + | "aa": "z", + | "ac": { + | "aca": 10 + | } + | } + | }, + | "nullCounts": { + | "ByteType": 1, + | "ShortType": 1, + | "IntegerType": 1, + | "LongType": 1, + | "FloatType": 1, + | "DoubleType": 1, + | "DecimalType": 1, + | "StringType": 1, + | "DateType": 1, + | "TimestampType": 1, + | "TimestampNTZType": 1, + | "BinaryType": 1, + | "NestedStruct": { + | "aa": 1, + | "ac": { + | "aca": 1 + | } + | } + |} + |}""".stripMargin + + val json = stats.serializeAsJson() + + assert(areJsonNodesEqual(json, expectedJson)) + } +} diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetFileWriter.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetFileWriter.java index 43781173ae..3e2fb51c71 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetFileWriter.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetFileWriter.java @@ -26,6 +26,7 @@ import io.delta.kernel.defaults.internal.parquet.ParquetColumnWriters.ColumnWriter; import io.delta.kernel.expressions.Column; import io.delta.kernel.internal.util.Utils; +import io.delta.kernel.statistics.DataFileStatistics; import io.delta.kernel.types.StructType; import io.delta.kernel.utils.*; import java.io.IOException; @@ -389,6 +390,7 @@ private DataFileStatus constructDataFileStatus(String path, StructType dataSchem if (statsColumns.isEmpty()) { stats = new DataFileStatistics( + dataSchema, numRows, emptyMap() /* minValues */, emptyMap() /* maxValues */, diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetStatsReader.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetStatsReader.java index f2f30653d7..2117bc6428 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetStatsReader.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetStatsReader.java @@ -23,8 +23,8 @@ import io.delta.kernel.expressions.Column; import io.delta.kernel.expressions.Literal; +import io.delta.kernel.statistics.DataFileStatistics; import io.delta.kernel.types.*; -import io.delta.kernel.utils.DataFileStatistics; import java.io.IOException; import java.math.BigDecimal; import java.math.BigInteger; @@ -33,9 +33,14 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.shaded.com.google.common.collect.ImmutableMultimap; import org.apache.hadoop.shaded.com.google.common.collect.Multimap; -import org.apache.parquet.column.statistics.*; +import org.apache.parquet.column.statistics.BinaryStatistics; +import org.apache.parquet.column.statistics.IntStatistics; +import org.apache.parquet.column.statistics.LongStatistics; +import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.hadoop.ParquetFileReader; -import org.apache.parquet.hadoop.metadata.*; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; @@ -116,7 +121,7 @@ private static DataFileStatistics constructFileStats( maxValues.put(statsColumn, maxValue); } - return new DataFileStatistics(rowCount, minValues, maxValues, nullCounts); + return new DataFileStatistics(dataSchema, rowCount, minValues, maxValues, nullCounts); } private static Literal decodeMinMaxStat(