-
Notifications
You must be signed in to change notification settings - Fork 54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merge partition columns into scan statistics for data skipping #615
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -1,11 +1,14 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||
use std::borrow::Cow; | ||||||||||||||||||||||||||||||||||||||||||||||||
use std::cmp::Ordering; | ||||||||||||||||||||||||||||||||||||||||||||||||
use std::collections::{HashMap, HashSet}; | ||||||||||||||||||||||||||||||||||||||||||||||||
use std::sync::{Arc, LazyLock}; | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
use arrow_array::{Array, ArrayRef, MapArray, RecordBatch, StringArray, StructArray}; | ||||||||||||||||||||||||||||||||||||||||||||||||
use tracing::debug; | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
use crate::actions::get_log_add_schema; | ||||||||||||||||||||||||||||||||||||||||||||||||
use crate::actions::visitors::SelectionVectorVisitor; | ||||||||||||||||||||||||||||||||||||||||||||||||
use crate::engine::arrow_data::ArrowEngineData; | ||||||||||||||||||||||||||||||||||||||||||||||||
use crate::error::DeltaResult; | ||||||||||||||||||||||||||||||||||||||||||||||||
use crate::expressions::{ | ||||||||||||||||||||||||||||||||||||||||||||||||
column_expr, joined_column_expr, BinaryOperator, ColumnName, Expression as Expr, ExpressionRef, | ||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -14,8 +17,10 @@ use crate::expressions::{ | |||||||||||||||||||||||||||||||||||||||||||||||
use crate::predicates::{ | ||||||||||||||||||||||||||||||||||||||||||||||||
DataSkippingPredicateEvaluator, PredicateEvaluator, PredicateEvaluatorDefaults, | ||||||||||||||||||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||||||||||||||||||
use crate::schema::{DataType, PrimitiveType, SchemaRef, SchemaTransform, StructField, StructType}; | ||||||||||||||||||||||||||||||||||||||||||||||||
use crate::{Engine, EngineData, ExpressionEvaluator, JsonHandler, RowVisitor as _}; | ||||||||||||||||||||||||||||||||||||||||||||||||
use crate::schema::{ | ||||||||||||||||||||||||||||||||||||||||||||||||
DataType, MapType, PrimitiveType, SchemaRef, SchemaTransform, StructField, StructType, | ||||||||||||||||||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||||||||||||||||||
use crate::{Engine, EngineData, Error, ExpressionEvaluator, JsonHandler, RowVisitor as _}; | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
#[cfg(test)] | ||||||||||||||||||||||||||||||||||||||||||||||||
mod tests; | ||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -43,6 +48,7 @@ fn as_data_skipping_predicate(expr: &Expr, inverted: bool) -> Option<Expr> { | |||||||||||||||||||||||||||||||||||||||||||||||
pub(crate) struct DataSkippingFilter { | ||||||||||||||||||||||||||||||||||||||||||||||||
stats_schema: SchemaRef, | ||||||||||||||||||||||||||||||||||||||||||||||||
select_stats_evaluator: Arc<dyn ExpressionEvaluator>, | ||||||||||||||||||||||||||||||||||||||||||||||||
partitions_evaluator: Arc<dyn ExpressionEvaluator>, | ||||||||||||||||||||||||||||||||||||||||||||||||
skipping_evaluator: Arc<dyn ExpressionEvaluator>, | ||||||||||||||||||||||||||||||||||||||||||||||||
filter_evaluator: Arc<dyn ExpressionEvaluator>, | ||||||||||||||||||||||||||||||||||||||||||||||||
json_handler: Arc<dyn JsonHandler>, | ||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -61,6 +67,8 @@ impl DataSkippingFilter { | |||||||||||||||||||||||||||||||||||||||||||||||
static PREDICATE_SCHEMA: LazyLock<DataType> = LazyLock::new(|| { | ||||||||||||||||||||||||||||||||||||||||||||||||
DataType::struct_type([StructField::new("predicate", DataType::BOOLEAN, true)]) | ||||||||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||||||||
static PARITIONS_EXPR: LazyLock<Expr> = | ||||||||||||||||||||||||||||||||||||||||||||||||
LazyLock::new(|| column_expr!("add.partitionValues")); | ||||||||||||||||||||||||||||||||||||||||||||||||
static STATS_EXPR: LazyLock<Expr> = LazyLock::new(|| column_expr!("add.stats")); | ||||||||||||||||||||||||||||||||||||||||||||||||
static FILTER_EXPR: LazyLock<Expr> = | ||||||||||||||||||||||||||||||||||||||||||||||||
LazyLock::new(|| column_expr!("predicate").distinct(false)); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -88,6 +96,8 @@ impl DataSkippingFilter { | |||||||||||||||||||||||||||||||||||||||||||||||
StructField::new("maxValues", referenced_schema, true), | ||||||||||||||||||||||||||||||||||||||||||||||||
])); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
let partitions_map_type = MapType::new(DataType::STRING, DataType::STRING, true); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
// Skipping happens in several steps: | ||||||||||||||||||||||||||||||||||||||||||||||||
// | ||||||||||||||||||||||||||||||||||||||||||||||||
// 1. The stats selector fetches add.stats from the metadata | ||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -106,6 +116,12 @@ impl DataSkippingFilter { | |||||||||||||||||||||||||||||||||||||||||||||||
DataType::STRING, | ||||||||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
let partitions_evaluator = engine.get_expression_handler().get_evaluator( | ||||||||||||||||||||||||||||||||||||||||||||||||
get_log_add_schema().clone(), | ||||||||||||||||||||||||||||||||||||||||||||||||
PARITIONS_EXPR.clone(), | ||||||||||||||||||||||||||||||||||||||||||||||||
partitions_map_type.into(), | ||||||||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
let skipping_evaluator = engine.get_expression_handler().get_evaluator( | ||||||||||||||||||||||||||||||||||||||||||||||||
stats_schema.clone(), | ||||||||||||||||||||||||||||||||||||||||||||||||
Expr::struct_from([as_data_skipping_predicate(&predicate, false)?]), | ||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -121,6 +137,7 @@ impl DataSkippingFilter { | |||||||||||||||||||||||||||||||||||||||||||||||
Some(Self { | ||||||||||||||||||||||||||||||||||||||||||||||||
stats_schema, | ||||||||||||||||||||||||||||||||||||||||||||||||
select_stats_evaluator, | ||||||||||||||||||||||||||||||||||||||||||||||||
partitions_evaluator, | ||||||||||||||||||||||||||||||||||||||||||||||||
skipping_evaluator, | ||||||||||||||||||||||||||||||||||||||||||||||||
filter_evaluator, | ||||||||||||||||||||||||||||||||||||||||||||||||
json_handler: engine.get_json_handler(), | ||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -138,6 +155,11 @@ impl DataSkippingFilter { | |||||||||||||||||||||||||||||||||||||||||||||||
.parse_json(stats, self.stats_schema.clone())?; | ||||||||||||||||||||||||||||||||||||||||||||||||
assert_eq!(parsed_stats.len(), actions.len()); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
let parsed_partitions = self.partitions_evaluator.evaluate(actions)?; | ||||||||||||||||||||||||||||||||||||||||||||||||
assert_eq!(parsed_partitions.len(), actions.len()); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
let parsed_stats = merge_partitions_into_stats(parsed_partitions, parsed_stats)?; | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
// evaluate the predicate on the parsed stats, then convert to selection vector | ||||||||||||||||||||||||||||||||||||||||||||||||
let skipping_predicate = self.skipping_evaluator.evaluate(&*parsed_stats)?; | ||||||||||||||||||||||||||||||||||||||||||||||||
assert_eq!(skipping_predicate.len(), actions.len()); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -257,3 +279,148 @@ impl DataSkippingPredicateEvaluator for DataSkippingPredicateCreator { | |||||||||||||||||||||||||||||||||||||||||||||||
Some(Expr::variadic(op, exprs)) | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
/// This function computes the values for the partition arrays that are added to the stats | ||||||||||||||||||||||||||||||||||||||||||||||||
/// fields to follow. Since the partition columns are a MapArray, we need to find for each | ||||||||||||||||||||||||||||||||||||||||||||||||
/// key the value assigned for each log. | ||||||||||||||||||||||||||||||||||||||||||||||||
fn compute_partition_arrays( | ||||||||||||||||||||||||||||||||||||||||||||||||
partitions_column: &ArrayRef, | ||||||||||||||||||||||||||||||||||||||||||||||||
output_schema: &Arc<arrow_schema::Schema>, | ||||||||||||||||||||||||||||||||||||||||||||||||
) -> DeltaResult<HashMap<String, ArrayRef>> { | ||||||||||||||||||||||||||||||||||||||||||||||||
let output_types: HashMap<String, arrow_schema::DataType> = | ||||||||||||||||||||||||||||||||||||||||||||||||
match output_schema.field_with_name("minValues")?.data_type() { | ||||||||||||||||||||||||||||||||||||||||||||||||
arrow_schema::DataType::Struct(fields) => fields | ||||||||||||||||||||||||||||||||||||||||||||||||
.iter() | ||||||||||||||||||||||||||||||||||||||||||||||||
.map(|field| (field.name().to_owned(), field.data_type().to_owned())), | ||||||||||||||||||||||||||||||||||||||||||||||||
_ => return Err(Error::engine_data_type("minValues")), | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
.collect(); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
let partitions_array = partitions_column | ||||||||||||||||||||||||||||||||||||||||||||||||
.as_any() | ||||||||||||||||||||||||||||||||||||||||||||||||
.downcast_ref::<MapArray>() | ||||||||||||||||||||||||||||||||||||||||||||||||
.ok_or_else(|| Error::engine_data_type("Partitions"))?; | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
let keys: HashSet<String> = partitions_array | ||||||||||||||||||||||||||||||||||||||||||||||||
.keys() | ||||||||||||||||||||||||||||||||||||||||||||||||
.as_any() | ||||||||||||||||||||||||||||||||||||||||||||||||
.downcast_ref::<StringArray>() | ||||||||||||||||||||||||||||||||||||||||||||||||
.ok_or_else(|| Error::engine_data_type("Partition keys"))? | ||||||||||||||||||||||||||||||||||||||||||||||||
.iter() | ||||||||||||||||||||||||||||||||||||||||||||||||
.filter_map(|s| s.map(|t| t.to_string())) | ||||||||||||||||||||||||||||||||||||||||||||||||
.collect(); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
let partition_values: HashMap<String, _> = keys | ||||||||||||||||||||||||||||||||||||||||||||||||
.iter() | ||||||||||||||||||||||||||||||||||||||||||||||||
.filter_map(|key| { | ||||||||||||||||||||||||||||||||||||||||||||||||
let cast_type = output_types.get(key)?; | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
let values = partitions_array | ||||||||||||||||||||||||||||||||||||||||||||||||
.iter() | ||||||||||||||||||||||||||||||||||||||||||||||||
.map(|maybe_partition| { | ||||||||||||||||||||||||||||||||||||||||||||||||
maybe_partition.and_then(|partition_data| { | ||||||||||||||||||||||||||||||||||||||||||||||||
let keys = partition_data | ||||||||||||||||||||||||||||||||||||||||||||||||
.column_by_name("key")? | ||||||||||||||||||||||||||||||||||||||||||||||||
.as_any() | ||||||||||||||||||||||||||||||||||||||||||||||||
.downcast_ref::<StringArray>()?; | ||||||||||||||||||||||||||||||||||||||||||||||||
let values = partition_data | ||||||||||||||||||||||||||||||||||||||||||||||||
.column_by_name("value")? | ||||||||||||||||||||||||||||||||||||||||||||||||
.as_any() | ||||||||||||||||||||||||||||||||||||||||||||||||
.downcast_ref::<StringArray>()?; | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
let mut kv = | ||||||||||||||||||||||||||||||||||||||||||||||||
keys.iter() | ||||||||||||||||||||||||||||||||||||||||||||||||
.zip(values.iter()) | ||||||||||||||||||||||||||||||||||||||||||||||||
.filter_map(|(k, v)| match (k, v) { | ||||||||||||||||||||||||||||||||||||||||||||||||
(Some(k), Some(v)) => Some((k, v)), | ||||||||||||||||||||||||||||||||||||||||||||||||
_ => None, | ||||||||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
kv.find(|(k, _)| *k == key.as_str()) | ||||||||||||||||||||||||||||||||||||||||||||||||
.map(|(_, v)| v.to_string()) | ||||||||||||||||||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||||||||||||||||||
.collect::<Vec<Option<String>>>(); | ||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: use |
||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
let string_array = StringArray::from(values); | ||||||||||||||||||||||||||||||||||||||||||||||||
let value_array = arrow_cast::cast(&string_array, &cast_type).ok()?; | ||||||||||||||||||||||||||||||||||||||||||||||||
Some((key.to_owned(), value_array)) | ||||||||||||||||||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||||||||||||||||||
.collect(); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
Ok(partition_values) | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
/// This funtion builds up the stats fields for the min and max values. It assumes | ||||||||||||||||||||||||||||||||||||||||||||||||
/// that the arrays for the partition fields already exist. It will only build those | ||||||||||||||||||||||||||||||||||||||||||||||||
/// that match the predicate filters. | ||||||||||||||||||||||||||||||||||||||||||||||||
fn merge_partition_fields_into_stats( | ||||||||||||||||||||||||||||||||||||||||||||||||
stats_batch: &RecordBatch, | ||||||||||||||||||||||||||||||||||||||||||||||||
idx: usize, | ||||||||||||||||||||||||||||||||||||||||||||||||
partition_values: &HashMap<String, Arc<dyn Array>>, | ||||||||||||||||||||||||||||||||||||||||||||||||
) -> DeltaResult<Arc<dyn arrow_array::Array + 'static>> { | ||||||||||||||||||||||||||||||||||||||||||||||||
let (fields, mut arrays, nulls) = stats_batch | ||||||||||||||||||||||||||||||||||||||||||||||||
.column(idx) | ||||||||||||||||||||||||||||||||||||||||||||||||
.as_any() | ||||||||||||||||||||||||||||||||||||||||||||||||
.downcast_ref::<StructArray>() | ||||||||||||||||||||||||||||||||||||||||||||||||
.ok_or_else(|| Error::engine_data_type("minValues"))? | ||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doesn't this method also get called for |
||||||||||||||||||||||||||||||||||||||||||||||||
.to_owned() | ||||||||||||||||||||||||||||||||||||||||||||||||
.into_parts(); | ||||||||||||||||||||||||||||||||||||||||||||||||
for (idx, field) in itertools::enumerate(fields.iter()) { | ||||||||||||||||||||||||||||||||||||||||||||||||
if let Some(arr) = partition_values.get(field.name()) { | ||||||||||||||||||||||||||||||||||||||||||||||||
arrays[idx] = Arc::clone(arr); | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
Ok(Arc::new(StructArray::new(fields, arrays, nulls)) | ||||||||||||||||||||||||||||||||||||||||||||||||
as Arc<(dyn arrow_array::Array + 'static)>) | ||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+374
to
+375
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the cast necessary? I thought Arc decayed?
Suggested change
or, if the cast is unavoidable:
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
/// This function adds partition data to the stats fields. For each partition field for a log | ||||||||||||||||||||||||||||||||||||||||||||||||
/// it adds the partition value to both the `minValues` and `maxValues` fields, so that when | ||||||||||||||||||||||||||||||||||||||||||||||||
/// we match against it with the data skipping filters we can effectively skip files. | ||||||||||||||||||||||||||||||||||||||||||||||||
fn merge_partitions_into_stats( | ||||||||||||||||||||||||||||||||||||||||||||||||
partitions: Box<dyn EngineData>, | ||||||||||||||||||||||||||||||||||||||||||||||||
stats: Box<dyn EngineData>, | ||||||||||||||||||||||||||||||||||||||||||||||||
) -> DeltaResult<Box<dyn EngineData>> { | ||||||||||||||||||||||||||||||||||||||||||||||||
let partitions = ArrowEngineData::try_from_engine_data(partitions)?; | ||||||||||||||||||||||||||||||||||||||||||||||||
let partitions_batch = partitions.record_batch(); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
// If the struct is partitions data is emtpy, return the original stats | ||||||||||||||||||||||||||||||||||||||||||||||||
let partitions_column = match partitions_batch.column_by_name("output") { | ||||||||||||||||||||||||||||||||||||||||||||||||
Some(c) => c, | ||||||||||||||||||||||||||||||||||||||||||||||||
None => return Ok(stats), | ||||||||||||||||||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+389
to
+392
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit:
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
let stats = ArrowEngineData::try_from_engine_data(stats)?; | ||||||||||||||||||||||||||||||||||||||||||||||||
let stats_batch = stats.record_batch(); | ||||||||||||||||||||||||||||||||||||||||||||||||
let output_schema = stats_batch.schema(); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
// For each unique partition key, generate the associated array | ||||||||||||||||||||||||||||||||||||||||||||||||
// to add to the stats fields | ||||||||||||||||||||||||||||||||||||||||||||||||
let partition_values = compute_partition_arrays(partitions_column, &output_schema)?; | ||||||||||||||||||||||||||||||||||||||||||||||||
if partition_values.is_empty() { | ||||||||||||||||||||||||||||||||||||||||||||||||
return Ok(stats); | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
let mut columns = Vec::default(); | ||||||||||||||||||||||||||||||||||||||||||||||||
for (idx, field) in itertools::enumerate(output_schema.fields()) { | ||||||||||||||||||||||||||||||||||||||||||||||||
match field.name().as_str() { | ||||||||||||||||||||||||||||||||||||||||||||||||
"minValues" => columns.push(merge_partition_fields_into_stats( | ||||||||||||||||||||||||||||||||||||||||||||||||
stats_batch, | ||||||||||||||||||||||||||||||||||||||||||||||||
idx, | ||||||||||||||||||||||||||||||||||||||||||||||||
&partition_values, | ||||||||||||||||||||||||||||||||||||||||||||||||
)?), | ||||||||||||||||||||||||||||||||||||||||||||||||
"maxValues" => columns.push(merge_partition_fields_into_stats( | ||||||||||||||||||||||||||||||||||||||||||||||||
stats_batch, | ||||||||||||||||||||||||||||||||||||||||||||||||
idx, | ||||||||||||||||||||||||||||||||||||||||||||||||
&partition_values, | ||||||||||||||||||||||||||||||||||||||||||||||||
)?), | ||||||||||||||||||||||||||||||||||||||||||||||||
_ => { | ||||||||||||||||||||||||||||||||||||||||||||||||
columns.push(Arc::clone(stats_batch.column(idx))); | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+407
to
+420
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit:
Suggested change
I don't understand what this code is doing tho? What should happen if a stat other than min or max is requested for a partition column? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Update: Figured it out -- partition values only get merged in for min/max stats; other stats remain unchanged. |
||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
let record_batch = RecordBatch::try_new(output_schema, columns)?; | ||||||||||||||||||||||||||||||||||||||||||||||||
Ok(Box::new(ArrowEngineData::new(record_batch))) | ||||||||||||||||||||||||||||||||||||||||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just:
(have to return the last match because arrow and parquet maps are allowed to have duplicate keys, last key wins)