Skip to content

Commit

Permalink
[Kernel] Delta table state reconstruction
Browse files Browse the repository at this point in the history
(Cherry-pick of 27111ee to branch-2.5)

This PR is part of #1783.

It adds the Delta table state reconstruction and end-2-end API implementation.

Integration tests with different types of Delta tables.

Closes #1857
  • Loading branch information
vkorukanti committed Jun 27, 2023
1 parent fc1a187 commit 01dd1ed
Show file tree
Hide file tree
Showing 175 changed files with 5,511 additions and 115 deletions.
81 changes: 58 additions & 23 deletions kernel/kernel-api/src/main/java/io/delta/kernel/Scan.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
package io.delta.kernel;

import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;

import io.delta.kernel.client.FileReadContext;
import io.delta.kernel.client.ParquetHandler;
Expand All @@ -32,16 +35,19 @@
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.Utils;

import io.delta.kernel.internal.util.PartitionUtils;

/**
* Represents a scan of a Delta table.
*/
public interface Scan {
public interface Scan
{
/**
* Get an iterator of data files to scan.
*
* @param tableClient {@link TableClient} instance to use in Delta Kernel.
* @return iterator of {@link ColumnarBatch}s where each row in each batch corresponds to one
* scan file
* scan file
*/
CloseableIterator<ColumnarBatch> getScanFiles(TableClient tableClient);

Expand All @@ -51,7 +57,7 @@ public interface Scan {
*
* @return the remaining filter as an {@link Expression}.
*/
Expression getRemainingFilter();
Optional<Expression> getRemainingFilter();

/**
* Get the scan state associated with the current scan. This state is common across all
Expand All @@ -68,40 +74,69 @@ public interface Scan {
* @param tableClient Connector provided {@link TableClient} implementation.
* @param scanState Scan state returned by {@link Scan#getScanState(TableClient)}
* @param scanFileRowIter an iterator of {@link Row}s. Each {@link Row} represents one scan file
* from the {@link ColumnarBatch} returned by
* {@link Scan#getScanFiles(TableClient)}
* from the {@link ColumnarBatch} returned by
* {@link Scan#getScanFiles(TableClient)}
* @param filter An optional filter that can be used for data skipping while reading the
* scan files.
* scan files.
* @return Data read from the input scan files as an iterator of {@link DataReadResult}s. Each
* {@link DataReadResult} instance contains the data read and an optional selection
* vector that indicates data rows as valid or invalid. It is the responsibility of the
* caller to close this iterator.
* {@link DataReadResult} instance contains the data read and an optional selection
* vector that indicates data rows as valid or invalid. It is the responsibility of the
* caller to close this iterator.
* @throws IOException when error occurs while reading the data.
*/
static CloseableIterator<DataReadResult> readData(
TableClient tableClient,
Row scanState,
CloseableIterator<Row> scanFileRowIter,
Optional<Expression> filter) throws IOException {
TableClient tableClient,
Row scanState,
CloseableIterator<Row> scanFileRowIter,
Optional<Expression> filter) throws IOException
{
StructType physicalSchema = Utils.getPhysicalSchema(tableClient, scanState);
StructType logicalSchema = Utils.getLogicalSchema(tableClient, scanState);
List<String> partitionColumns = Utils.getPartitionColumns(scanState);
Set<String> partitionColumnsSet = new HashSet<>(partitionColumns);

StructType readSchema = Utils.getPhysicalSchema(scanState);
StructType readSchemaWithoutPartitionColumns =
PartitionUtils.physicalSchemaWithoutPartitionColumns(
logicalSchema,
physicalSchema,
partitionColumnsSet);

ParquetHandler parquetHandler = tableClient.getParquetHandler();

CloseableIterator<FileReadContext> filesReadContextsIter =
parquetHandler.contextualizeFileReads(
scanFileRowIter,
filter.orElse(Literal.TRUE));
parquetHandler.contextualizeFileReads(
scanFileRowIter,
filter.orElse(Literal.TRUE));

CloseableIterator<FileDataReadResult> data =
parquetHandler.readParquetFiles(filesReadContextsIter, readSchema);
CloseableIterator<FileDataReadResult> data = parquetHandler.readParquetFiles(
filesReadContextsIter,
readSchemaWithoutPartitionColumns);

// TODO: Attach the selection vector associated with the file
return data.map(fileDataReadResult ->
new DataReadResult(
return data.map(fileDataReadResult -> {
ColumnarBatch updatedBatch =
PartitionUtils.withPartitionColumns(
tableClient.getExpressionHandler(),
fileDataReadResult.getData(),
Optional.empty()
)
readSchemaWithoutPartitionColumns,
Utils.getPartitionValues(fileDataReadResult.getScanFileRow()),
physicalSchema
);

String columnMappingMode = Utils.getColumnMappingMode(scanState);
switch (columnMappingMode) {
case "name":
updatedBatch = updatedBatch.withNewSchema(logicalSchema);
break;
case "none":
break;
default:
throw new UnsupportedOperationException(
"Column mapping mode is not yet supported: " + columnMappingMode);
}

return new DataReadResult(updatedBatch, Optional.empty());
}
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
/**
* Builder to construct {@link Scan} object.
*/
public interface ScanBuilder {
public interface ScanBuilder
{

/**
* Apply the given filter expression to prune any files that do not contain data satisfying
Expand All @@ -32,11 +33,8 @@ public interface ScanBuilder {
* @param tableClient {@link TableClient} instance to use in Delta Kernel.
* @param filter an {@link Expression} which evaluates to boolean.
* @return A {@link ScanBuilder} with filter applied.
*
* @throws InvalidExpressionException if the filter is not valid.
*/
ScanBuilder withFilter(TableClient tableClient, Expression filter)
throws InvalidExpressionException;
ScanBuilder withFilter(TableClient tableClient, Expression filter);

/**
* Apply the given <i>readSchema</i>. If the builder already has a projection applied, calling
Expand Down
13 changes: 7 additions & 6 deletions kernel/kernel-api/src/main/java/io/delta/kernel/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel;

import io.delta.kernel.client.TableClient;

import io.delta.kernel.internal.TableImpl;

/**
* Represents the Delta Lake table for a given path.
*/
public interface Table {

public interface Table
{
/**
* Instantiate a table object for the Delta Lake table at the given path.
*
Expand All @@ -33,8 +34,7 @@ public interface Table {
static Table forPath(String path)
throws TableNotFoundException
{
// TODO requires io.delta.kernel.internal.TableImpl
throw new UnsupportedOperationException("not implemented yet");
return TableImpl.forPath(path);
}

/**
Expand All @@ -43,5 +43,6 @@ static Table forPath(String path)
* @param tableClient {@link TableClient} instance to use in Delta Kernel.
* @return an instance of {@link Snapshot}
*/
Snapshot getLatestSnapshot(TableClient tableClient);
Snapshot getLatestSnapshot(TableClient tableClient)
throws TableNotFoundException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,25 @@ public interface ColumnVector extends AutoCloseable {
void close();

/**
* @param rowId
* @return whether the value at {@code rowId} is NULL.
*/
boolean isNullAt(int rowId);

/**
* Returns the boolean type value for {@code rowId}. The return value is undefined and can be
* anything, if the slot for {@code rowId} is null.
* @param rowId
* @return Boolean value at the given row id
*/
default boolean getBoolean(int rowId) {
throw new UnsupportedOperationException("Invalid value request for data type");
}
/**
* Returns the byte type value for {@code rowId}. The return value is undefined and can be
* anything, if the slot for {@code rowId} is null.
* @param rowId
* @return Byte value at the given row id
*/
default byte getByte(int rowId) {
throw new UnsupportedOperationException("Invalid value request for data type");
Expand All @@ -64,6 +69,8 @@ default byte getByte(int rowId) {
/**
* Returns the short type value for {@code rowId}. The return value is undefined and can be
* anything, if the slot for {@code rowId} is null.
* @param rowId
* @return Short value at the given row id
*/
default short getShort(int rowId) {
throw new UnsupportedOperationException("Invalid value request for data type");
Expand All @@ -72,6 +79,8 @@ default short getShort(int rowId) {
/**
* Returns the int type value for {@code rowId}. The return value is undefined and can be
* anything, if the slot for {@code rowId} is null.
* @param rowId
* @return Integer value at the given row id
*/
default int getInt(int rowId) {
throw new UnsupportedOperationException("Invalid value request for data type");
Expand All @@ -80,6 +89,8 @@ default int getInt(int rowId) {
/**
* Returns the long type value for {@code rowId}. The return value is undefined and can be
* anything, if the slot for {@code rowId} is null.
* @param rowId
* @return Long value at the given row id
*/
default long getLong(int rowId) {
throw new UnsupportedOperationException("Invalid value request for data type");
Expand All @@ -88,6 +99,8 @@ default long getLong(int rowId) {
/**
* Returns the float type value for {@code rowId}. The return value is undefined and can be
* anything, if the slot for {@code rowId} is null.
* @param rowId
* @return Float value at the given row id
*/
default float getFloat(int rowId) {
throw new UnsupportedOperationException("Invalid value request for data type");
Expand All @@ -96,35 +109,63 @@ default float getFloat(int rowId) {
/**
* Returns the double type value for {@code rowId}. The return value is undefined and can be
* anything, if the slot for {@code rowId} is null.
* @param rowId
* @return Double value at the given row id
*/
default double getDouble(int rowId) {
throw new UnsupportedOperationException("Invalid value request for data type");
}

/**
* Returns the binary type value for {@code rowId}. If the slot for {@code rowId} is null, it
* should return null.
* Returns the binary type value for {@code rowId}. The return value is undefined and can be
* anything, if the slot for {@code rowId} is null.
* @param rowId
* @return Binary value at the given row id
*/
default byte[] getBinary(int rowId) {
throw new UnsupportedOperationException("Invalid value request for data type");
}

/**
* Returns the string type value for {@code rowId}. If the slot for {@code rowId} is null, it
* should return null.
* Returns the string type value for {@code rowId}. The return value is undefined and can be
* anything, if the slot for {@code rowId} is null.
* @param rowId
* @return String value at the given row id
*/
default String getString(int rowId) {
throw new UnsupportedOperationException("Invalid value request for data type");
}

/**
* Return the map type value located at {@code rowId}. The return value is undefined and can be
* anything, if the slot for {@code rowId} is null.
* @param rowId
* @param <K> Return map key type
* @param <V> Return map value type
* @return
*/
default <K, V> Map<K, V> getMap(int rowId) {
throw new UnsupportedOperationException("Invalid value request for data type");
}

/**
* Return the row value located at {@code rowId}. The return value is undefined and can be
* anything, if the slot for {@code rowId} is null.
* @param rowId
* @return
*/
default Row getStruct(int rowId) {
throw new UnsupportedOperationException("Invalid value request for data type");
}

/**
* Return the array value located at {@code rowId}. The return value is undefined and can be
* anything, if the slot for {@code rowId} is null.
*
* @param rowId
* @param <T> Array element type
* @return
*/
default <T> List<T> getArray(int rowId) {
throw new UnsupportedOperationException("Invalid value request for data type");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.delta.kernel.data;

import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterator;

Expand All @@ -24,7 +25,8 @@
/**
* Represents zero or more rows of records with same schema type.
*/
public interface ColumnarBatch {
public interface ColumnarBatch
{
/**
* @return the schema of the data in this batch.
*/
Expand All @@ -33,6 +35,7 @@ public interface ColumnarBatch {
/**
* Return the {@link ColumnVector} for the given ordinal in the columnar batch. If the ordinal
* is not valid throws error.
*
* @param ordinal the ordinal of the column to retrieve
* @return the {@link ColumnVector} for the given ordinal in the columnar batch
*/
Expand All @@ -43,14 +46,48 @@ public interface ColumnarBatch {
*/
int getSize();

/**
* Return a copy of the {@link ColumnarBatch} with given new column vector inserted at the
* given {@code columnVector} at given {@code ordinal}. Shift the existing
* {@link ColumnVector}s located at from {@code ordinal} to the end by one position.
* The schema of the new {@link ColumnarBatch} will also be changed to reflect the newly
* inserted vector.
*
* @param ordinal
* @param columnSchema Column name and schema details of the new column vector.
* @param columnVector
* @return {@link ColumnarBatch} with new vector inserted.
* @throws IllegalArgumentException If the ordinal is not valid (ie less than zero or
* greater than the current number of vectors).
*/
default ColumnarBatch withNewColumn(int ordinal, StructField columnSchema,
ColumnVector columnVector)
{
throw new UnsupportedOperationException("Not yet implemented");
}

/**
* Generate a copy of this {@link ColumnarBatch} with the given {@code newSchema}. The data
* types of elements in the given new schema and existing schema should be the same. Rest of
* the details such as name of the column or column metadata could be different.
*
* @param newSchema
* @return {@link ColumnarBatch} with given new schema.
*/
default ColumnarBatch withNewSchema(StructType newSchema)
{
throw new UnsupportedOperationException("Not yet implemented");
}

/**
* Return a slice of the current batch.
*
* @param start Starting record index to include in the returned columnar batch
* @param end Ending record index (exclusive) to include in the returned columnar batch
* @return a columnar batch containing the records between [start, end)
*/
default ColumnarBatch slice(int start, int end) {
default ColumnarBatch slice(int start, int end)
{
throw new UnsupportedOperationException("Not yet implemented!");
}

Expand Down
Loading

0 comments on commit 01dd1ed

Please sign in to comment.