Skip to content

Commit d59fe0b

Browse files
committed
chore: bump kernel and cleanup
Signed-off-by: Robert Pack <[email protected]>
1 parent a639dea commit d59fe0b

File tree

6 files changed

+107
-36
lines changed

6 files changed

+107
-36
lines changed

crates/core/src/delta_datafusion/expr.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use datafusion_expr::expr::InList;
3636
use datafusion_expr::planner::ExprPlanner;
3737
use datafusion_expr::{AggregateUDF, Between, BinaryExpr, Cast, Expr, Like, TableSource};
3838
// Needed for MakeParquetArray
39+
use datafusion_expr::planner::{PlannerResult, RawBinaryExpr};
3940
use datafusion_expr::{ColumnarValue, Documentation, ScalarUDF, ScalarUDFImpl, Signature};
4041
use datafusion_functions::core::planner::CoreFunctionPlanner;
4142
use datafusion_sql::planner::{ContextProvider, SqlToRel};
@@ -156,7 +157,6 @@ impl Default for CustomNestedFunctionPlanner {
156157
}
157158
}
158159

159-
use datafusion_expr::planner::{PlannerResult, RawBinaryExpr};
160160
impl ExprPlanner for CustomNestedFunctionPlanner {
161161
fn plan_array_literal(
162162
&self,

crates/core/src/delta_datafusion/logical.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ impl UserDefinedLogicalNodeCore for MetricObserver {
2222
"MetricObserver"
2323
}
2424

25-
fn inputs(&self) -> Vec<&datafusion_expr::LogicalPlan> {
25+
fn inputs(&self) -> Vec<&LogicalPlan> {
2626
vec![&self.input]
2727
}
2828

@@ -50,19 +50,15 @@ impl UserDefinedLogicalNodeCore for MetricObserver {
5050
write!(f, "MetricObserver id={}", &self.id)
5151
}
5252

53-
fn from_template(
54-
&self,
55-
exprs: &[datafusion_expr::Expr],
56-
inputs: &[datafusion_expr::LogicalPlan],
57-
) -> Self {
53+
fn from_template(&self, exprs: &[datafusion_expr::Expr], inputs: &[LogicalPlan]) -> Self {
5854
self.with_exprs_and_inputs(exprs.to_vec(), inputs.to_vec())
5955
.unwrap()
6056
}
6157

6258
fn with_exprs_and_inputs(
6359
&self,
6460
_exprs: Vec<datafusion_expr::Expr>,
65-
inputs: Vec<datafusion_expr::LogicalPlan>,
61+
inputs: Vec<LogicalPlan>,
6662
) -> datafusion_common::Result<Self> {
6763
Ok(MetricObserver {
6864
id: self.id.clone(),

crates/core/src/kernel/snapshot/log_data.rs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,17 @@ use std::collections::HashMap;
33
use std::sync::Arc;
44

55
use arrow_array::{Array, Int32Array, Int64Array, MapArray, RecordBatch, StringArray, StructArray};
6+
use arrow_select::filter::filter_record_batch;
67
use chrono::{DateTime, Utc};
7-
use delta_kernel::expressions::Scalar;
8+
use delta_kernel::expressions::{Scalar, StructData};
9+
use delta_kernel::{Expression, ExpressionHandler};
810
use indexmap::IndexMap;
911
use object_store::path::Path;
1012
use object_store::ObjectMeta;
1113
use percent_encoding::percent_decode_str;
1214

1315
use super::super::scalars::ScalarExt;
16+
use super::super::ARROW_HANDLER;
1417
use crate::kernel::arrow::extract::{extract_and_cast, extract_and_cast_opt};
1518
use crate::kernel::{
1619
DataType, DeletionVectorDescriptor, Metadata, Remove, StructField, StructType,
@@ -338,6 +341,7 @@ pub struct FileStatsAccessor<'a> {
338341
stats: &'a StructArray,
339342
deletion_vector: Option<DeletionVector<'a>>,
340343
partition_values: &'a MapArray,
344+
partition_values_parsed: Option<&'a StructArray>,
341345
length: usize,
342346
pointer: usize,
343347
}
@@ -353,6 +357,8 @@ impl<'a> FileStatsAccessor<'a> {
353357
let modification_times = extract_and_cast::<Int64Array>(data, "add.modificationTime")?;
354358
let stats = extract_and_cast::<StructArray>(data, "add.stats_parsed")?;
355359
let partition_values = extract_and_cast::<MapArray>(data, "add.partitionValues")?;
360+
let partition_values_parsed =
361+
extract_and_cast_opt::<StructArray>(data, "add.partitionValues_parsed");
356362
let partition_fields = Arc::new(
357363
metadata
358364
.partition_columns
@@ -398,6 +404,7 @@ impl<'a> FileStatsAccessor<'a> {
398404
stats,
399405
deletion_vector,
400406
partition_values,
407+
partition_values_parsed,
401408
length: data.num_rows(),
402409
pointer: 0,
403410
})
@@ -437,6 +444,30 @@ impl<'a> Iterator for FileStatsAccessor<'a> {
437444
}
438445
}
439446

447+
pub struct LogDataIterator<'a> {
448+
data: &'a RecordBatch,
449+
pointer: usize,
450+
}
451+
452+
impl<'a> LogDataIterator<'a> {
453+
pub(crate) fn new(data: &'a RecordBatch) -> Self {
454+
Self { data, pointer: 0 }
455+
}
456+
457+
pub(crate) fn len(&self) -> usize {
458+
self.data.num_rows()
459+
}
460+
461+
pub fn path(&self) -> &str {
462+
let paths = self
463+
.data
464+
.column_by_name("path")
465+
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
466+
.unwrap();
467+
paths.value(self.pointer)
468+
}
469+
}
470+
440471
/// Provides semanitc access to the log data.
441472
///
442473
/// This is a helper struct that provides access to the log data in a more semantic way

crates/core/src/kernel/snapshot/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::sync::Arc;
2121

2222
use ::serde::{Deserialize, Serialize};
2323
use arrow_array::RecordBatch;
24+
use delta_kernel::Expression;
2425
use futures::stream::BoxStream;
2526
use futures::{StreamExt, TryStreamExt};
2627
use object_store::path::Path;

crates/core/src/schema/partitions.rs

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,14 @@ use std::cmp::Ordering;
33
use std::collections::HashMap;
44
use std::convert::TryFrom;
55

6-
use delta_kernel::expressions::Scalar;
6+
use delta_kernel::expressions::{ArrayData, BinaryOperator, Expression, Scalar, UnaryOperator};
7+
use delta_kernel::schema::{ArrayType, Schema};
8+
use delta_kernel::DeltaResult as KernelResult;
79
use serde::{Serialize, Serializer};
810

911
use crate::errors::DeltaTableError;
1012
use crate::kernel::{scalars::ScalarExt, DataType, PrimitiveType};
13+
use crate::DeltaResult;
1114

1215
/// A special value used in Hive to represent the null partition in partitioned tables
1316
pub const NULL_PARTITION_VALUE_DATA_PATH: &str = "__HIVE_DEFAULT_PARTITION__";
@@ -78,6 +81,53 @@ pub struct PartitionFilter {
7881
pub value: PartitionValue,
7982
}
8083

84+
fn filter_to_expression(filter: &PartitionFilter, schema: &Schema) -> DeltaResult<Expression> {
85+
let field = schema.field(&filter.key).ok_or_else(|| {
86+
DeltaTableError::generic(format!(
87+
"Partition column not defined in schema: {}",
88+
&filter.key
89+
))
90+
})?;
91+
let col = Expression::column([field.name().as_str()]);
92+
let partion_type = field.data_type().as_primitive_opt().ok_or_else(|| {
93+
DeltaTableError::InvalidPartitionFilter {
94+
partition_filter: filter.key.to_string(),
95+
}
96+
})?;
97+
let to_literal = |value: &str| -> KernelResult<_> {
98+
Ok(Expression::literal(partion_type.parse_scalar(value)?))
99+
};
100+
101+
match &filter.value {
102+
PartitionValue::Equal(value) => Ok(col.eq(to_literal(value.as_str())?)),
103+
PartitionValue::NotEqual(value) => Ok(Expression::unary(
104+
UnaryOperator::Not,
105+
col.eq(to_literal(value.as_str())?),
106+
)),
107+
PartitionValue::GreaterThan(value) => Ok(col.gt(to_literal(value.as_str())?)),
108+
PartitionValue::GreaterThanOrEqual(value) => Ok(col.gt_eq(to_literal(value.as_str())?)),
109+
PartitionValue::LessThan(value) => Ok(col.lt(to_literal(value.as_str())?)),
110+
PartitionValue::LessThanOrEqual(value) => Ok(col.lt_eq(to_literal(value.as_str())?)),
111+
PartitionValue::In(values) | PartitionValue::NotIn(values) => {
112+
let values = values
113+
.iter()
114+
.map(|v| partion_type.parse_scalar(v))
115+
.collect::<KernelResult<Vec<_>>>()?;
116+
let array = Expression::literal(Scalar::Array(ArrayData::new(
117+
ArrayType::new(field.data_type().clone(), false),
118+
values,
119+
)));
120+
match &filter.value {
121+
PartitionValue::In(_) => Ok(Expression::binary(BinaryOperator::In, col, array)),
122+
PartitionValue::NotIn(_) => {
123+
Ok(Expression::binary(BinaryOperator::NotIn, col, array))
124+
}
125+
_ => unreachable!(),
126+
}
127+
}
128+
}
129+
}
130+
81131
fn compare_typed_value(
82132
partition_value: &Scalar,
83133
filter_value: &str,
@@ -95,6 +145,10 @@ fn compare_typed_value(
95145

96146
/// Partition filters methods for filtering the DeltaTable partitions.
97147
impl PartitionFilter {
148+
pub fn to_expression(&self, schema: &Schema) -> DeltaResult<Expression> {
149+
filter_to_expression(self, schema)
150+
}
151+
98152
/// Indicates if a DeltaTable partition matches with the partition filter by key and value.
99153
pub fn match_partition(&self, partition: &DeltaTablePartition, data_type: &DataType) -> bool {
100154
if self.key != partition.key {

crates/core/src/table/state_arrow.rs

Lines changed: 15 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use std::sync::Arc;
99
use arrow_array::types::{Date32Type, TimestampMicrosecondType};
1010
use arrow_array::{
1111
Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Float64Array, Int64Array, NullArray,
12-
StringArray, StructArray, TimestampMicrosecondArray, TimestampMillisecondArray,
12+
RecordBatch, StringArray, StructArray, TimestampMicrosecondArray, TimestampMillisecondArray,
1313
};
1414
use arrow_cast::cast;
1515
use arrow_cast::parse::Parser;
@@ -23,7 +23,7 @@ use crate::kernel::{Add, DataType as DeltaDataType, StructType};
2323
use crate::protocol::{ColumnCountStat, ColumnValueStat, Stats};
2424

2525
impl DeltaTableState {
26-
/// Get an [arrow::record_batch::RecordBatch] containing add action data.
26+
/// Get an [RecordBatch] containing add action data.
2727
///
2828
/// # Arguments
2929
///
@@ -50,10 +50,7 @@ impl DeltaTableState {
5050
/// * `max.{col_name}` (matches column type): maximum value of column in file
5151
/// (if available).
5252
/// * `tag.{tag_key}` (String): value of a metadata tag for the file.
53-
pub fn add_actions_table(
54-
&self,
55-
flatten: bool,
56-
) -> Result<arrow::record_batch::RecordBatch, DeltaTableError> {
53+
pub fn add_actions_table(&self, flatten: bool) -> Result<RecordBatch, DeltaTableError> {
5754
let files = self.file_actions()?;
5855
let mut paths = arrow::array::StringBuilder::with_capacity(
5956
files.len(),
@@ -133,14 +130,14 @@ impl DeltaTableState {
133130
);
134131
}
135132

136-
Ok(arrow::record_batch::RecordBatch::try_from_iter(arrays)?)
133+
Ok(RecordBatch::try_from_iter(arrays)?)
137134
}
138135

139136
fn partition_columns_as_batch(
140137
&self,
141138
flatten: bool,
142139
files: &Vec<Add>,
143-
) -> Result<arrow::record_batch::RecordBatch, DeltaTableError> {
140+
) -> Result<RecordBatch, DeltaTableError> {
144141
let metadata = self.metadata();
145142
let column_mapping_mode = self.table_config().column_mapping_mode();
146143
let partition_column_types: Vec<arrow::datatypes::DataType> = metadata
@@ -172,7 +169,6 @@ impl DeltaTableState {
172169
.collect::<HashMap<&str, _>>();
173170

174171
validate_schema_column_mapping(self.schema(), column_mapping_mode)?;
175-
176172
let physical_name_to_logical_name = match column_mapping_mode {
177173
ColumnMappingMode::None => HashMap::with_capacity(0), // No column mapping, no need for this HashMap
178174
ColumnMappingMode::Id | ColumnMappingMode::Name => metadata
@@ -258,16 +254,14 @@ impl DeltaTableState {
258254
}
259255
};
260256

261-
Ok(arrow::record_batch::RecordBatch::try_from_iter(
262-
partition_columns,
263-
)?)
257+
Ok(RecordBatch::try_from_iter(partition_columns)?)
264258
}
265259

266260
fn tags_as_batch(
267261
&self,
268262
flatten: bool,
269263
files: &Vec<Add>,
270-
) -> Result<arrow::record_batch::RecordBatch, DeltaTableError> {
264+
) -> Result<RecordBatch, DeltaTableError> {
271265
let tag_keys: HashSet<&str> = files
272266
.iter()
273267
.flat_map(|add| add.tags.as_ref().map(|tags| tags.keys()))
@@ -306,7 +300,7 @@ impl DeltaTableState {
306300
// Sorted for consistent order
307301
arrays.sort_by(|(key1, _), (key2, _)| key1.cmp(key2));
308302
if flatten {
309-
Ok(arrow::record_batch::RecordBatch::try_from_iter(
303+
Ok(RecordBatch::try_from_iter(
310304
arrays
311305
.into_iter()
312306
.map(|(key, array)| (format!("tags.{key}"), array)),
@@ -316,7 +310,7 @@ impl DeltaTableState {
316310
.into_iter()
317311
.map(|(key, array)| (Field::new(key, array.data_type().clone(), true), array))
318312
.unzip();
319-
Ok(arrow::record_batch::RecordBatch::try_from_iter(vec![(
313+
Ok(RecordBatch::try_from_iter(vec![(
320314
"tags",
321315
Arc::new(StructArray::new(Fields::from(fields), arrays, None)) as ArrayRef,
322316
)])?)
@@ -327,7 +321,7 @@ impl DeltaTableState {
327321
&self,
328322
flatten: bool,
329323
files: &Vec<Add>,
330-
) -> Result<arrow::record_batch::RecordBatch, DeltaTableError> {
324+
) -> Result<RecordBatch, DeltaTableError> {
331325
let capacity = files.len();
332326
let mut storage_type = arrow::array::StringBuilder::with_capacity(capacity, 1);
333327
let mut path_or_inline_div = arrow::array::StringBuilder::with_capacity(capacity, 64);
@@ -355,7 +349,7 @@ impl DeltaTableState {
355349
}
356350
}
357351
if flatten {
358-
Ok(arrow::record_batch::RecordBatch::try_from_iter(vec![
352+
Ok(RecordBatch::try_from_iter(vec![
359353
(
360354
"deletionVector.storageType",
361355
Arc::new(storage_type.finish()) as ArrayRef,
@@ -378,7 +372,7 @@ impl DeltaTableState {
378372
),
379373
])?)
380374
} else {
381-
Ok(arrow::record_batch::RecordBatch::try_from_iter(vec![(
375+
Ok(RecordBatch::try_from_iter(vec![(
382376
"deletionVector",
383377
Arc::new(StructArray::new(
384378
Fields::from(vec![
@@ -401,10 +395,7 @@ impl DeltaTableState {
401395
}
402396
}
403397

404-
fn stats_as_batch(
405-
&self,
406-
flatten: bool,
407-
) -> Result<arrow::record_batch::RecordBatch, DeltaTableError> {
398+
fn stats_as_batch(&self, flatten: bool) -> Result<RecordBatch, DeltaTableError> {
408399
let stats: Vec<Option<Stats>> = self
409400
.file_actions_iter()?
410401
.map(|f| {
@@ -628,9 +619,7 @@ impl DeltaTableState {
628619
}
629620
}
630621

631-
Ok(arrow::record_batch::RecordBatch::try_from_iter(
632-
out_columns,
633-
)?)
622+
Ok(RecordBatch::try_from_iter(out_columns)?)
634623
}
635624
}
636625

@@ -689,7 +678,7 @@ impl<'a> SchemaLeafIterator<'a> {
689678
}
690679
}
691680

692-
impl<'a> std::iter::Iterator for SchemaLeafIterator<'a> {
681+
impl<'a> Iterator for SchemaLeafIterator<'a> {
693682
type Item = (Vec<&'a str>, &'a DeltaDataType);
694683

695684
fn next(&mut self) -> Option<Self::Item> {

0 commit comments

Comments
 (0)