Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge partition columns into scan statistics for data skipping #615

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 169 additions & 2 deletions kernel/src/scan/data_skipping.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use std::borrow::Cow;
use std::cmp::Ordering;
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, LazyLock};

use arrow_array::{Array, ArrayRef, MapArray, RecordBatch, StringArray, StructArray};
use tracing::debug;

use crate::actions::get_log_add_schema;
use crate::actions::visitors::SelectionVectorVisitor;
use crate::engine::arrow_data::ArrowEngineData;
use crate::error::DeltaResult;
use crate::expressions::{
column_expr, joined_column_expr, BinaryOperator, ColumnName, Expression as Expr, ExpressionRef,
Expand All @@ -14,8 +17,10 @@ use crate::expressions::{
use crate::predicates::{
DataSkippingPredicateEvaluator, PredicateEvaluator, PredicateEvaluatorDefaults,
};
use crate::schema::{DataType, PrimitiveType, SchemaRef, SchemaTransform, StructField, StructType};
use crate::{Engine, EngineData, ExpressionEvaluator, JsonHandler, RowVisitor as _};
use crate::schema::{
DataType, MapType, PrimitiveType, SchemaRef, SchemaTransform, StructField, StructType,
};
use crate::{Engine, EngineData, Error, ExpressionEvaluator, JsonHandler, RowVisitor as _};

#[cfg(test)]
mod tests;
Expand Down Expand Up @@ -43,6 +48,7 @@ fn as_data_skipping_predicate(expr: &Expr, inverted: bool) -> Option<Expr> {
pub(crate) struct DataSkippingFilter {
stats_schema: SchemaRef,
select_stats_evaluator: Arc<dyn ExpressionEvaluator>,
partitions_evaluator: Arc<dyn ExpressionEvaluator>,
skipping_evaluator: Arc<dyn ExpressionEvaluator>,
filter_evaluator: Arc<dyn ExpressionEvaluator>,
json_handler: Arc<dyn JsonHandler>,
Expand All @@ -61,6 +67,8 @@ impl DataSkippingFilter {
static PREDICATE_SCHEMA: LazyLock<DataType> = LazyLock::new(|| {
DataType::struct_type([StructField::new("predicate", DataType::BOOLEAN, true)])
});
static PARITIONS_EXPR: LazyLock<Expr> =
LazyLock::new(|| column_expr!("add.partitionValues"));
static STATS_EXPR: LazyLock<Expr> = LazyLock::new(|| column_expr!("add.stats"));
static FILTER_EXPR: LazyLock<Expr> =
LazyLock::new(|| column_expr!("predicate").distinct(false));
Expand Down Expand Up @@ -88,6 +96,8 @@ impl DataSkippingFilter {
StructField::new("maxValues", referenced_schema, true),
]));

let partitions_map_type = MapType::new(DataType::STRING, DataType::STRING, true);

// Skipping happens in several steps:
//
// 1. The stats selector fetches add.stats from the metadata
Expand All @@ -106,6 +116,12 @@ impl DataSkippingFilter {
DataType::STRING,
);

let partitions_evaluator = engine.get_expression_handler().get_evaluator(
get_log_add_schema().clone(),
PARITIONS_EXPR.clone(),
partitions_map_type.into(),
);

let skipping_evaluator = engine.get_expression_handler().get_evaluator(
stats_schema.clone(),
Expr::struct_from([as_data_skipping_predicate(&predicate, false)?]),
Expand All @@ -121,6 +137,7 @@ impl DataSkippingFilter {
Some(Self {
stats_schema,
select_stats_evaluator,
partitions_evaluator,
skipping_evaluator,
filter_evaluator,
json_handler: engine.get_json_handler(),
Expand All @@ -138,6 +155,11 @@ impl DataSkippingFilter {
.parse_json(stats, self.stats_schema.clone())?;
assert_eq!(parsed_stats.len(), actions.len());

let parsed_partitions = self.partitions_evaluator.evaluate(actions)?;
assert_eq!(parsed_partitions.len(), actions.len());

let parsed_stats = merge_partitions_into_stats(parsed_partitions, parsed_stats)?;

// evaluate the predicate on the parsed stats, then convert to selection vector
let skipping_predicate = self.skipping_evaluator.evaluate(&*parsed_stats)?;
assert_eq!(skipping_predicate.len(), actions.len());
Expand Down Expand Up @@ -257,3 +279,148 @@ impl DataSkippingPredicateEvaluator for DataSkippingPredicateCreator {
Some(Expr::variadic(op, exprs))
}
}

/// This function computes the values for the partition arrays that are added to the stats
/// fields to follow. Since the partition columns are a MapArray, we need to find for each
/// key the value assigned for each log.
fn compute_partition_arrays(
partitions_column: &ArrayRef,
output_schema: &Arc<arrow_schema::Schema>,
) -> DeltaResult<HashMap<String, ArrayRef>> {
let output_types: HashMap<String, arrow_schema::DataType> =
match output_schema.field_with_name("minValues")?.data_type() {
arrow_schema::DataType::Struct(fields) => fields
.iter()
.map(|field| (field.name().to_owned(), field.data_type().to_owned())),
_ => return Err(Error::engine_data_type("minValues")),
}
.collect();

let partitions_array = partitions_column
.as_any()
.downcast_ref::<MapArray>()
.ok_or_else(|| Error::engine_data_type("Partitions"))?;

let keys: HashSet<String> = partitions_array
.keys()
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| Error::engine_data_type("Partition keys"))?
.iter()
.filter_map(|s| s.map(|t| t.to_string()))
.collect();

let partition_values: HashMap<String, _> = keys
.iter()
.filter_map(|key| {
let cast_type = output_types.get(key)?;

let values = partitions_array
.iter()
.map(|maybe_partition| {
maybe_partition.and_then(|partition_data| {
let keys = partition_data
.column_by_name("key")?
.as_any()
.downcast_ref::<StringArray>()?;
let values = partition_data
.column_by_name("value")?
.as_any()
.downcast_ref::<StringArray>()?;

let mut kv =
keys.iter()
.zip(values.iter())
.filter_map(|(k, v)| match (k, v) {
(Some(k), Some(v)) => Some((k, v)),
_ => None,
});

kv.find(|(k, _)| *k == key.as_str())
.map(|(_, v)| v.to_string())
Comment on lines +331 to +340
Copy link
Collaborator

@scovich scovich Dec 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just:

Suggested change
let mut kv =
keys.iter()
.zip(values.iter())
.filter_map(|(k, v)| match (k, v) {
(Some(k), Some(v)) => Some((k, v)),
_ => None,
});
kv.find(|(k, _)| *k == key.as_str())
.map(|(_, v)| v.to_string())
keys
.iter()
.zip(values.iter())
.filter_map(|(k, v)| match (k, v) {
(Some(k), Some(v)) if *k == key.as_str() => Some(v.to_string()),
_ => None,
})
.last()

(have to return the last match because arrow and parquet maps are allowed to have duplicate keys, last key wins)

})
})
.collect::<Vec<Option<String>>>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use Itertools::collect_vec to avoid the turbofish


let string_array = StringArray::from(values);
let value_array = arrow_cast::cast(&string_array, &cast_type).ok()?;
Some((key.to_owned(), value_array))
})
.collect();

Ok(partition_values)
}

/// This funtion builds up the stats fields for the min and max values. It assumes
/// that the arrays for the partition fields already exist. It will only build those
/// that match the predicate filters.
fn merge_partition_fields_into_stats(
stats_batch: &RecordBatch,
idx: usize,
partition_values: &HashMap<String, Arc<dyn Array>>,
) -> DeltaResult<Arc<dyn arrow_array::Array + 'static>> {
let (fields, mut arrays, nulls) = stats_batch
.column(idx)
.as_any()
.downcast_ref::<StructArray>()
.ok_or_else(|| Error::engine_data_type("minValues"))?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this method also get called for maxValues?

.to_owned()
.into_parts();
for (idx, field) in itertools::enumerate(fields.iter()) {
if let Some(arr) = partition_values.get(field.name()) {
arrays[idx] = Arc::clone(arr);
}
}
Ok(Arc::new(StructArray::new(fields, arrays, nulls))
as Arc<(dyn arrow_array::Array + 'static)>)
Comment on lines +374 to +375
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the cast necessary? I thought Arc decayed?

Suggested change
Ok(Arc::new(StructArray::new(fields, arrays, nulls))
as Arc<(dyn arrow_array::Array + 'static)>)
let array = Arc::new(StructArray::new(fields, arrays, nulls));
Ok(array)

or, if the cast is unavoidable:

Suggested change
Ok(Arc::new(StructArray::new(fields, arrays, nulls))
as Arc<(dyn arrow_array::Array + 'static)>)
Ok(Arc::new(StructArray::new(fields, arrays, nulls)) as _)

}

/// This function adds partition data to the stats fields. For each partition field for a log
/// it adds the partition value to both the `minValues` and `maxValues` fields, so that when
/// we match against it with the data skipping filters we can effectively skip files.
fn merge_partitions_into_stats(
partitions: Box<dyn EngineData>,
stats: Box<dyn EngineData>,
) -> DeltaResult<Box<dyn EngineData>> {
let partitions = ArrowEngineData::try_from_engine_data(partitions)?;
let partitions_batch = partitions.record_batch();

// If the struct is partitions data is emtpy, return the original stats
let partitions_column = match partitions_batch.column_by_name("output") {
Some(c) => c,
None => return Ok(stats),
};
Comment on lines +389 to +392
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
let partitions_column = match partitions_batch.column_by_name("output") {
Some(c) => c,
None => return Ok(stats),
};
let Some(partitions_column) = partitions_batch.column_by_name("output") else {
return Ok(stats);
};


let stats = ArrowEngineData::try_from_engine_data(stats)?;
let stats_batch = stats.record_batch();
let output_schema = stats_batch.schema();

// For each unique partition key, generate the associated array
// to add to the stats fields
let partition_values = compute_partition_arrays(partitions_column, &output_schema)?;
if partition_values.is_empty() {
return Ok(stats);
}

let mut columns = Vec::default();
for (idx, field) in itertools::enumerate(output_schema.fields()) {
match field.name().as_str() {
"minValues" => columns.push(merge_partition_fields_into_stats(
stats_batch,
idx,
&partition_values,
)?),
"maxValues" => columns.push(merge_partition_fields_into_stats(
stats_batch,
idx,
&partition_values,
)?),
_ => {
columns.push(Arc::clone(stats_batch.column(idx)));
}
Comment on lines +407 to +420
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
match field.name().as_str() {
"minValues" => columns.push(merge_partition_fields_into_stats(
stats_batch,
idx,
&partition_values,
)?),
"maxValues" => columns.push(merge_partition_fields_into_stats(
stats_batch,
idx,
&partition_values,
)?),
_ => {
columns.push(Arc::clone(stats_batch.column(idx)));
}
let column = match field.name().as_str() {
"minValues" | "maxValues" => merge_partition_fields_into_stats(
stats_batch,
idx,
&partition_values,
)?,
_ => Arc::clone(stats_batch.column(idx)),
};
columns.push(column);

I don't understand what this code is doing tho? What should happen if a stat other than min or max is requested for a partition column?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: Figured it out -- partition values only get merged in for min/max stats; other stats remain unchanged.

}
}

let record_batch = RecordBatch::try_new(output_schema, columns)?;
Ok(Box::new(ArrowEngineData::new(record_batch)))
}
33 changes: 32 additions & 1 deletion kernel/src/table_changes/log_replay/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ fn get_schema() -> StructType {
StructType::new([
StructField::new("id", DataType::INTEGER, true),
StructField::new("value", DataType::STRING, true),
StructField::new("partition_col", DataType::INTEGER, true),
])
}

Expand Down Expand Up @@ -490,6 +491,9 @@ async fn data_skipping_filter() {
}),
Action::Add(Add {
path: "fake_path_1".into(),
partition_values: HashMap::from([
("partition_col".to_string(), "1".to_string()),
]),
stats: Some("{\"numRecords\":4,\"minValues\":{\"id\":4},\"maxValues\":{\"id\":6},\"nullCount\":{\"id\":3}}".into()),
data_change: true,
deletion_vector: deletion_vector.clone(),
Expand All @@ -503,6 +507,9 @@ async fn data_skipping_filter() {
}),
Action::Add(Add {
path: "fake_path_2".into(),
partition_values: HashMap::from([
("partition_col".to_string(), "2".to_string()),
]),
stats: Some("{\"numRecords\":4,\"minValues\":{\"id\":4},\"maxValues\":{\"id\":4},\"nullCount\":{\"id\":3}}".into()),
data_change: true,
deletion_vector,
Expand All @@ -511,6 +518,9 @@ async fn data_skipping_filter() {
// Add action with max value id = 5
Action::Add(Add {
path: "fake_path_3".into(),
partition_values: HashMap::from([
("partition_col".to_string(), "3".to_string()),
]),
stats: Some("{\"numRecords\":4,\"minValues\":{\"id\":4},\"maxValues\":{\"id\":5},\"nullCount\":{\"id\":3}}".into()),
data_change: true,
..Default::default()
Expand All @@ -533,7 +543,7 @@ async fn data_skipping_filter() {
.unwrap()
.into_iter();

let sv = table_changes_action_iter(engine, commits, logical_schema.into(), predicate)
let sv = table_changes_action_iter(engine.clone(), commits.clone(), logical_schema.clone().into(), predicate)
.unwrap()
.flat_map(|scan_data| {
let scan_data = scan_data.unwrap();
Expand All @@ -543,6 +553,27 @@ async fn data_skipping_filter() {

// Note: since the first pair is a dv operation, remove action will always be filtered
assert_eq!(sv, &[false, true, false, false, true]);

let predicate = Expression::binary(
BinaryOperator::LessThanOrEqual,
column_expr!("partition_col"),
Scalar::from(2),
);
let predicate = match PhysicalPredicate::try_new(&predicate, &logical_schema) {
Ok(PhysicalPredicate::Some(p, s)) => Some((p, s)),
other => panic!("Unexpected result: {:?}", other),
};

let sv = table_changes_action_iter(engine, commits, logical_schema.into(), predicate)
.unwrap()
.flat_map(|scan_data| {
let scan_data = scan_data.unwrap();
scan_data.selection_vector
})
.collect_vec();

// Note: since the first pair is a dv operation, remove action will always be filtered
assert_eq!(sv, &[false, true, false, true, false]);
}

#[tokio::test]
Expand Down
Loading