Skip to content

Commit

Permalink
[Kernel][Writes] Add support for writing data file stats
Browse files Browse the repository at this point in the history
  • Loading branch information
raveeram-db committed Sep 18, 2024
1 parent 7bafe8e commit dd88aa2
Show file tree
Hide file tree
Showing 13 changed files with 541 additions and 115 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ lazy val kernelApi = (project in file("kernel/kernel-api"))
"org.roaringbitmap" % "RoaringBitmap" % "0.9.25",
"org.slf4j" % "slf4j-api" % "1.7.36",

"com.fasterxml.jackson.core" % "jackson-databind" % "2.13.5" % "test",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.13.5",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"junit" % "junit" % "4.13.2" % "test",
"com.novocode" % "junit-interface" % "0.11" % "test",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,20 @@ public String toString() {
return "column(" + quoteColumnPath(names) + ")";
}

/**
* Returns a new column that appends the input column name to the current column. Corresponds to
* an additional level of nested reference.
*
* @param name the column name to append
* @return the new column
*/
public Column append(String name) {
String[] newNames = new String[names.length + 1];
System.arraycopy(names, 0, newNames, 0, names.length);
newNames[names.length] = name;
return new Column(newNames);
}

private static String quoteColumnPath(String[] names) {
return Arrays.stream(names)
.map(s -> format("`%s`", s.replace("`", "``")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,23 @@
import static io.delta.kernel.internal.util.Preconditions.checkState;
import static io.delta.kernel.internal.util.Utils.toCloseableIterator;

import io.delta.kernel.*;
import io.delta.kernel.Meta;
import io.delta.kernel.Operation;
import io.delta.kernel.Transaction;
import io.delta.kernel.TransactionCommitResult;
import io.delta.kernel.data.Row;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.ConcurrentWriteException;
import io.delta.kernel.expressions.Column;
import io.delta.kernel.internal.actions.*;
import io.delta.kernel.internal.actions.CommitInfo;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.actions.SetTransaction;
import io.delta.kernel.internal.data.TransactionStateRow;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.replay.ConflictChecker;
import io.delta.kernel.internal.replay.ConflictChecker.TransactionRebaseState;
import io.delta.kernel.internal.skipping.DataSkippingUtils;
import io.delta.kernel.internal.util.Clock;
import io.delta.kernel.internal.util.ColumnMapping;
import io.delta.kernel.internal.util.FileNames;
Expand Down Expand Up @@ -322,6 +329,23 @@ private Map<String, String> getOperationParameters() {
*/
public static List<Column> getStatisticsColumns(Engine engine, Row transactionState) {
// TODO: implement this once we start supporting collecting stats
return Collections.emptyList();
int numIndexedCols =
Integer.parseInt(
TransactionStateRow.getConfiguration(transactionState)
.getOrDefault(
DataSkippingUtils.DATA_SKIPPING_NUM_INDEXED_COLS,
String.valueOf(DataSkippingUtils.DEFAULT_DATA_SKIPPING_NUM_INDEXED_COLS)));

// Get the list of partition columns to exclude
Set<String> partitionColumns =
TransactionStateRow.getPartitionColumnsList(transactionState).stream()
.collect(Collectors.toSet());

// For now, only support the first numIndexedCols columns
return TransactionStateRow.getLogicalSchema(engine, transactionState).fields().stream()
.filter(p -> !partitionColumns.contains(p.getName()))
.limit(numIndexedCols)
.map(field -> new Column(field.getName()))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.util.VectorUtils;
import io.delta.kernel.types.*;
import java.util.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;

public class TransactionStateRow extends GenericRow {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@

public class DataSkippingUtils {

public static final String DATA_SKIPPING_NUM_INDEXED_COLS = "delta.dataSkippingNumIndexedCols";
public static final int DEFAULT_DATA_SKIPPING_NUM_INDEXED_COLS = 32;

/**
* Given a {@code FilteredColumnarBatch} of scan files and the statistics schema to parse, return
* the parsed JSON stats from the scan files.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.statistics;

import static java.time.ZoneOffset.UTC;
import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
import static java.time.temporal.ChronoUnit.MILLIS;

import com.fasterxml.jackson.core.JsonGenerator;
import io.delta.kernel.expressions.Column;
import io.delta.kernel.expressions.Literal;
import io.delta.kernel.types.*;
import io.delta.kernel.utils.JsonUtil;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.Map;

/** Statistics about data file in a Delta Lake table. */
public class DataFileStatistics {
private StructType dataSchema;
private final long numRecords;
private final Map<Column, Literal> minValues;
private final Map<Column, Literal> maxValues;
private final Map<Column, Long> nullCounts;

public static final int MICROSECONDS_PER_SECOND = 1_000_000;
public static final int NANOSECONDS_PER_MICROSECOND = 1_000;

private static final DateTimeFormatter TIMESTAMP_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSX");

/**
* Create a new instance of {@link DataFileStatistics}.
*
* @param dataSchema Schema of the data file.
* @param numRecords Number of records in the data file.
* @param minValues Map of column to minimum value of it in the data file. If the data file has
* all nulls for the column, the value will be null or not present in the map.
* @param maxValues Map of column to maximum value of it in the data file. If the data file has
* all nulls for the column, the value will be null or not present in the map.
* @param nullCounts Map of column to number of nulls in the data file.
*/
public DataFileStatistics(
StructType dataSchema,
long numRecords,
Map<Column, Literal> minValues,
Map<Column, Literal> maxValues,
Map<Column, Long> nullCounts) {
this.dataSchema = dataSchema;
this.numRecords = numRecords;
this.minValues = Collections.unmodifiableMap(minValues);
this.maxValues = Collections.unmodifiableMap(maxValues);
this.nullCounts = Collections.unmodifiableMap(nullCounts);
}

/**
* Get the number of records in the data file.
*
* @return Number of records in the data file.
*/
public long getNumRecords() {
return numRecords;
}

/**
* Get the minimum values of the columns in the data file. The map may contain statistics for only
* a subset of columns in the data file.
*
* @return Map of column to minimum value of it in the data file.
*/
public Map<Column, Literal> getMinValues() {
return minValues;
}

/**
* Get the maximum values of the columns in the data file. The map may contain statistics for only
* a subset of columns in the data file.
*
* @return Map of column to minimum value of it in the data file.
*/
public Map<Column, Literal> getMaxValues() {
return maxValues;
}

/**
* Get the number of nulls of columns in the data file. The map may contain statistics for only a
* subset of columns in the data file.
*
* @return Map of column to number of nulls in the data file.
*/
public Map<Column, Long> getNullCounts() {
return nullCounts;
}

public String serializeAsJson() {
return JsonUtil.generate(
gen -> {
gen.writeStartObject();
gen.writeNumberField("numRecords", numRecords);

gen.writeObjectFieldStart("minValues");
// TODO: Prune dataSchema to a statsSchema instead?
writeJsonValues(
gen,
dataSchema,
minValues,
new Column(new String[0]),
(g, v) -> writeJsonValue(g, v));
gen.writeEndObject();

gen.writeObjectFieldStart("maxValues");
writeJsonValues(
gen,
dataSchema,
maxValues,
new Column(new String[0]),
(g, v) -> writeJsonValue(g, v));
gen.writeEndObject();

gen.writeObjectFieldStart("nullCounts");
writeJsonValues(
gen, dataSchema, nullCounts, new Column(new String[0]), (g, v) -> g.writeNumber(v));
gen.writeEndObject();

gen.writeEndObject();
});
}

private <T> void writeJsonValues(
JsonGenerator generator,
StructType schema,
Map<Column, T> values,
Column parentColPath,
JsonUtil.JsonValueWriter<T> writer)
throws IOException {
for (StructField field : schema.fields()) {
Column colPath = parentColPath.append(field.getName());
if (field.getDataType() instanceof StructType) {
generator.writeObjectFieldStart(field.getName());
writeJsonValues(generator, (StructType) field.getDataType(), values, colPath, writer);
generator.writeEndObject();
} else {
T value = values.get(colPath);
if (value != null) {
generator.writeFieldName(field.getName());
writer.write(generator, value);
}
}
}
}

private void writeJsonValue(JsonGenerator generator, Literal literal) throws IOException {
if (literal == null || literal.getValue() == null) {
return;
}
DataType type = literal.getDataType();
Object value = literal.getValue();
if (type instanceof BooleanType) {
generator.writeBoolean((Boolean) value);
} else if (type instanceof ByteType) {
generator.writeNumber(((Number) value).byteValue());
} else if (type instanceof ShortType) {
generator.writeNumber(((Number) value).shortValue());
} else if (type instanceof IntegerType) {
generator.writeNumber(((Number) value).intValue());
} else if (type instanceof LongType) {
generator.writeNumber(((Number) value).longValue());
} else if (type instanceof FloatType) {
generator.writeNumber(((Number) value).floatValue());
} else if (type instanceof DoubleType) {
generator.writeNumber(((Number) value).doubleValue());
} else if (type instanceof StringType) {
generator.writeString((String) value);
} else if (type instanceof BinaryType) {
generator.writeString(new String((byte[]) value, StandardCharsets.UTF_8));
} else if (type instanceof DecimalType) {
generator.writeNumber((BigDecimal) value);
} else if (type instanceof DateType) {
generator.writeString(
LocalDate.ofEpochDay(((Number) value).longValue()).format(ISO_LOCAL_DATE));
} else if (type instanceof TimestampType || type instanceof TimestampNTZType) {
long epochMicros = (long) value;
long epochSeconds = epochMicros / MICROSECONDS_PER_SECOND;
int nanoAdjustment =
(int) (epochMicros % MICROSECONDS_PER_SECOND) * NANOSECONDS_PER_MICROSECOND;
if (nanoAdjustment < 0) {
nanoAdjustment += MICROSECONDS_PER_SECOND * NANOSECONDS_PER_MICROSECOND;
}
Instant instant = Instant.ofEpochSecond(epochSeconds, nanoAdjustment);
generator.writeString(
TIMESTAMP_FORMATTER.format(ZonedDateTime.ofInstant(instant.truncatedTo(MILLIS), UTC)));
} else {
throw new IllegalArgumentException("Unsupported stats data type: " + type);
}
}
}
Loading

0 comments on commit dd88aa2

Please sign in to comment.