Skip to content

Commit

Permalink
[Kernel][Writes] Add support for writing data file stats
Browse files Browse the repository at this point in the history
  • Loading branch information
raveeram-db committed Jul 8, 2024
1 parent 87f0685 commit 25f689b
Show file tree
Hide file tree
Showing 12 changed files with 543 additions and 121 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ lazy val kernelApi = (project in file("kernel/kernel-api"))
"org.roaringbitmap" % "RoaringBitmap" % "0.9.25",
"org.slf4j" % "slf4j-api" % "1.7.36",

"com.fasterxml.jackson.core" % "jackson-databind" % "2.13.5" % "test",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.13.5",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"junit" % "junit" % "4.13.2" % "test",
"com.novocode" % "junit-interface" % "0.11" % "test",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public final class Column implements Expression {
* Create a column expression for referring to a column.
*/
public Column(String name) {
this.names = new String[] {name};
this.names = new String[]{name};
}

/**
Expand All @@ -62,7 +62,21 @@ public List<Expression> getChildren() {
@Override
public String toString() {

return "column(" + quoteColumnPath(names) + ")";
return "column(" + quoteColumnPath(names) + ")";
}

/**
* Returns a new column that appends the input column name to the current column.
* Corresponds to an additional level of nested reference.
*
* @param name the column name to append
* @return the new column
*/
public Column append(String name) {
String[] newNames = new String[names.length + 1];
System.arraycopy(names, 0, newNames, 0, names.length);
newNames[names.length] = name;
return new Column(newNames);
}

private static String quoteColumnPath(String[] names) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.delta.kernel.*;
import io.delta.kernel.Meta;
import io.delta.kernel.Operation;
import io.delta.kernel.Transaction;
import io.delta.kernel.TransactionCommitResult;
import io.delta.kernel.data.Row;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.ConcurrentWriteException;
Expand All @@ -32,11 +35,15 @@
import io.delta.kernel.utils.CloseableIterable;
import io.delta.kernel.utils.CloseableIterator;

import io.delta.kernel.internal.actions.*;
import io.delta.kernel.internal.actions.CommitInfo;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.actions.SetTransaction;
import io.delta.kernel.internal.data.TransactionStateRow;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.replay.ConflictChecker;
import io.delta.kernel.internal.replay.ConflictChecker.TransactionRebaseState;
import io.delta.kernel.internal.skipping.DataSkippingUtils;
import io.delta.kernel.internal.util.Clock;
import io.delta.kernel.internal.util.FileNames;
import io.delta.kernel.internal.util.InCommitTimestampUtils;
Expand Down Expand Up @@ -296,6 +303,14 @@ private Map<String, String> getOperationParameters() {
*/
public static List<Column> getStatisticsColumns(Engine engine, Row transactionState) {
// TODO: implement this once we start supporting collecting stats
return Collections.emptyList();
int numIndexedCols = Integer.parseInt(TransactionStateRow.getConfiguration(transactionState)
.getOrDefault(DataSkippingUtils.DATA_SKIPPING_NUM_INDEXED_COLS,
String.valueOf(DataSkippingUtils.DEFAULT_DATA_SKIPPING_NUM_INDEXED_COLS)));

// For now, only support the first numIndexedCols columns
return TransactionStateRow.getLogicalSchema(engine, transactionState).fields().stream()
.limit(numIndexedCols)
.map(field -> new Column(field.getName()))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@
*/
package io.delta.kernel.internal.data;

import java.util.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
import static java.util.stream.Collectors.toMap;

import io.delta.kernel.Transaction;
import io.delta.kernel.data.Row;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.types.*;
import io.delta.kernel.types.ArrayType;
import io.delta.kernel.types.MapType;
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructType;

import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.util.VectorUtils;
Expand All @@ -31,7 +36,8 @@ public class TransactionStateRow extends GenericRow {
private static final StructType SCHEMA = new StructType()
.add("logicalSchemaString", StringType.STRING)
.add("partitionColumns", new ArrayType(StringType.STRING, false))
.add("tablePath", StringType.STRING);
.add("tablePath", StringType.STRING)
.add("configuration", new MapType(StringType.STRING, StringType.STRING, false));

private static final Map<String, Integer> COL_NAME_TO_ORDINAL =
IntStream.range(0, SCHEMA.length())
Expand All @@ -43,6 +49,7 @@ public static TransactionStateRow of(Metadata metadata, String tablePath) {
valueMap.put(COL_NAME_TO_ORDINAL.get("logicalSchemaString"), metadata.getSchemaString());
valueMap.put(COL_NAME_TO_ORDINAL.get("partitionColumns"), metadata.getPartitionColumns());
valueMap.put(COL_NAME_TO_ORDINAL.get("tablePath"), tablePath);
valueMap.put(COL_NAME_TO_ORDINAL.get("configuration"), metadata.getConfigurationMapValue());
return new TransactionStateRow(valueMap);
}

Expand Down Expand Up @@ -85,4 +92,16 @@ public static List<String> getPartitionColumnsList(Row transactionState) {
public static String getTablePath(Row transactionState) {
return transactionState.getString(COL_NAME_TO_ORDINAL.get("tablePath"));
}

/**
* Get the configuration from the transaction state {@link Row} returned by
* {@link Transaction#getTransactionState(Engine)}
*
* @param transactionState Transaction state state {@link Row}
* @return Configuration of the table as a map.
*/
public static Map<String, String> getConfiguration(Row transactionState) {
return VectorUtils.toJavaMap(
transactionState.getMap(COL_NAME_TO_ORDINAL.get("configuration")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@

public class DataSkippingUtils {

public static final String DATA_SKIPPING_NUM_INDEXED_COLS = "delta.dataSkippingNumIndexedCols";
public static final int DEFAULT_DATA_SKIPPING_NUM_INDEXED_COLS = 32;

/**
* Given a {@code FilteredColumnarBatch} of scan files and the statistics schema to parse,
* return the parsed JSON stats from the scan files.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.statistics;

import java.io.IOException;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.Map;
import static java.time.ZoneOffset.UTC;
import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
import static java.time.temporal.ChronoUnit.MILLIS;

import com.fasterxml.jackson.core.JsonGenerator;

import io.delta.kernel.expressions.Column;
import io.delta.kernel.expressions.Literal;
import io.delta.kernel.types.*;
import io.delta.kernel.utils.JsonUtil;

/**
* Statistics about data file in a Delta Lake table.
*/
public class DataFileStatistics {
private StructType dataSchema;
private final long numRecords;
private final Map<Column, Literal> minValues;
private final Map<Column, Literal> maxValues;
private final Map<Column, Long> nullCounts;

public static final int MICROSECONDS_PER_SECOND = 1_000_000;
public static final int NANOSECONDS_PER_MICROSECOND = 1_000;

private static final DateTimeFormatter TIMESTAMP_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSX");


/**
* Create a new instance of {@link DataFileStatistics}.
*
* @param dataSchema Schema of the data file.
* @param numRecords Number of records in the data file.
* @param minValues Map of column to minimum value of it in the data file. If the data file has
* all nulls for the column, the value will be null or not present in the
* map.
* @param maxValues Map of column to maximum value of it in the data file. If the data file has
* all nulls for the column, the value will be null or not present in the
* map.
* @param nullCounts Map of column to number of nulls in the data file.
*/
public DataFileStatistics(
StructType dataSchema,
long numRecords,
Map<Column, Literal> minValues,
Map<Column, Literal> maxValues,
Map<Column, Long> nullCounts) {
this.dataSchema = dataSchema;
this.numRecords = numRecords;
this.minValues = Collections.unmodifiableMap(minValues);
this.maxValues = Collections.unmodifiableMap(maxValues);
this.nullCounts = Collections.unmodifiableMap(nullCounts);
}

/**
* Get the number of records in the data file.
*
* @return Number of records in the data file.
*/
public long getNumRecords() {
return numRecords;
}

/**
* Get the minimum values of the columns in the data file. The map may contain statistics for
* only a subset of columns in the data file.
*
* @return Map of column to minimum value of it in the data file.
*/
public Map<Column, Literal> getMinValues() {
return minValues;
}

/**
* Get the maximum values of the columns in the data file. The map may contain statistics for
* only a subset of columns in the data file.
*
* @return Map of column to minimum value of it in the data file.
*/
public Map<Column, Literal> getMaxValues() {
return maxValues;
}

/**
* Get the number of nulls of columns in the data file. The map may contain statistics for only
* a subset of columns in the data file.
*
* @return Map of column to number of nulls in the data file.
*/
public Map<Column, Long> getNullCounts() {
return nullCounts;
}

public String serializeAsJson() {
return JsonUtil.generate(gen -> {
gen.writeStartObject();
gen.writeNumberField("numRecords", numRecords);

gen.writeObjectFieldStart("minValues");
// TODO: Prune dataSchema to a statsSchema instead?
writeJsonValues(gen, dataSchema, minValues, new Column(new String[0]),
(g, v) -> writeJsonValue(g, v));
gen.writeEndObject();

gen.writeObjectFieldStart("maxValues");
writeJsonValues(gen, dataSchema, maxValues, new Column(new String[0]),
(g, v) -> writeJsonValue(g, v));
gen.writeEndObject();

gen.writeObjectFieldStart("nullCounts");
writeJsonValues(gen, dataSchema, nullCounts, new Column(new String[0]),
(g, v) -> g.writeNumber(v));
gen.writeEndObject();

gen.writeEndObject();
});
}

private <T> void writeJsonValues(JsonGenerator generator,
StructType schema,
Map<Column, T> values,
Column parentColPath,
JsonUtil.JsonValueWriter<T> writer) throws IOException {
for (StructField field : schema.fields()) {
Column colPath = parentColPath.append(field.getName());
if (field.getDataType() instanceof StructType) {
generator.writeObjectFieldStart(field.getName());
writeJsonValues(generator, (StructType) field.getDataType(), values, colPath,
writer);
generator.writeEndObject();
} else {
T value = values.get(colPath);
if (value != null) {
generator.writeFieldName(field.getName());
writer.write(generator, value);
}
}
}
}

private void writeJsonValue(JsonGenerator generator,
Literal literal) throws IOException {
if (literal == null || literal.getValue() == null) {
return;
}
DataType type = literal.getDataType();
Object value = literal.getValue();
if (type instanceof BooleanType) {
generator.writeBoolean((Boolean) value);
} else if (type instanceof ByteType) {
generator.writeNumber(((Number) value).byteValue());
} else if (type instanceof ShortType) {
generator.writeNumber(((Number) value).shortValue());
} else if (type instanceof IntegerType) {
generator.writeNumber(((Number) value).intValue());
} else if (type instanceof LongType) {
generator.writeNumber(((Number) value).longValue());
} else if (type instanceof FloatType) {
generator.writeNumber(((Number) value).floatValue());
} else if (type instanceof DoubleType) {
generator.writeNumber(((Number) value).doubleValue());
} else if (type instanceof StringType) {
generator.writeString((String) value);
} else if (type instanceof BinaryType) {
generator.writeString(new String((byte[]) value, StandardCharsets.UTF_8));
} else if (type instanceof DecimalType) {
generator.writeNumber((BigDecimal) value);
} else if (type instanceof DateType) {
generator.writeString(
LocalDate.ofEpochDay(((Number) value).longValue()).format(ISO_LOCAL_DATE));
} else if (type instanceof TimestampType || type instanceof TimestampNTZType) {
long epochMicros = (long) value;
long epochSeconds = epochMicros / MICROSECONDS_PER_SECOND;
int nanoAdjustment =
(int) (epochMicros % MICROSECONDS_PER_SECOND) * NANOSECONDS_PER_MICROSECOND;
if (nanoAdjustment < 0) {
nanoAdjustment += MICROSECONDS_PER_SECOND * NANOSECONDS_PER_MICROSECOND;
}
Instant instant = Instant.ofEpochSecond(epochSeconds, nanoAdjustment);
generator.writeString(TIMESTAMP_FORMATTER.format(
ZonedDateTime.ofInstant(instant.truncatedTo(MILLIS), UTC)));
} else {
throw new IllegalArgumentException("Unsupported stats data type: " + type);
}
}
}
Loading

0 comments on commit 25f689b

Please sign in to comment.