Skip to content

Commit e539663

Browse files
committed
Merge remote-tracking branch 'apache/main' into workspace-tokio
2 parents 2f8caa7 + afb169c commit e539663

File tree

6 files changed

+307
-18
lines changed

6 files changed

+307
-18
lines changed

datafusion-examples/README.md

+1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ cargo run --example csv_sql
5656
- [`flight_sql_server.rs`](examples/flight/flight_sql_server.rs): Run DataFusion as a standalone process and execute SQL queries from JDBC clients
5757
- [`make_date.rs`](examples/make_date.rs): Examples of using the make_date function
5858
- [`memtable.rs`](examples/memtable.rs): Create an query data in memory using SQL and `RecordBatch`es
59+
- [`pruning.rs`](examples/parquet_sql.rs): Use pruning to rule out files based on statistics
5960
- [`parquet_sql.rs`](examples/parquet_sql.rs): Build and run a query plan from a SQL statement against a local Parquet file
6061
- [`parquet_sql_multiple_files.rs`](examples/parquet_sql_multiple_files.rs): Build and run a query plan from a SQL statement against multiple local Parquet files
6162
- [`query-aws-s3.rs`](examples/external_dependency/query-aws-s3.rs): Configure `object_store` and run a query against files stored in AWS S3
+186
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow::array::{ArrayRef, BooleanArray, Int32Array};
19+
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
20+
use datafusion::common::{DFSchema, ScalarValue};
21+
use datafusion::execution::context::ExecutionProps;
22+
use datafusion::physical_expr::create_physical_expr;
23+
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
24+
use datafusion::prelude::*;
25+
use std::collections::HashSet;
26+
use std::sync::Arc;
27+
28+
/// This example shows how to use DataFusion's `PruningPredicate` to prove
29+
/// filter expressions can never be true based on statistics such as min/max
30+
/// values of columns.
31+
///
32+
/// The process is called "pruning" and is commonly used in query engines to
33+
/// quickly eliminate entire files / partitions / row groups of data from
34+
/// consideration using statistical information from a catalog or other
35+
/// metadata.
36+
#[tokio::main]
37+
async fn main() {
38+
// In this example, we'll use the PruningPredicate to determine if
39+
// the expression `x = 5 AND y = 10` can never be true based on statistics
40+
41+
// Start with the expression `x = 5 AND y = 10`
42+
let expr = col("x").eq(lit(5)).and(col("y").eq(lit(10)));
43+
44+
// We can analyze this predicate using information provided by the
45+
// `PruningStatistics` trait, in this case we'll use a simple catalog that
46+
// models three files. For all rows in each file:
47+
//
48+
// File 1: x has values between `4` and `6`
49+
// y has the value 10
50+
//
51+
// File 2: x has values between `4` and `6`
52+
// y has the value of `7`
53+
//
54+
// File 3: x has the value 1
55+
// nothing is known about the value of y
56+
let my_catalog = MyCatalog::new();
57+
58+
// Create a `PruningPredicate`.
59+
//
60+
// Note the predicate does not automatically coerce types or simplify
61+
// expressions. See expr_api.rs examples for how to do this if required
62+
let predicate = create_pruning_predicate(expr, &my_catalog.schema);
63+
64+
// Evaluate the predicate for the three files in the catalog
65+
let prune_results = predicate.prune(&my_catalog).unwrap();
66+
println!("Pruning results: {prune_results:?}");
67+
68+
// The result is a `Vec` of bool values, one for each file in the catalog
69+
assert_eq!(
70+
prune_results,
71+
vec![
72+
// File 1: `x = 5 AND y = 10` can evaluate to true if x has values
73+
// between `4` and `6`, y has the value `10`, so the file can not be
74+
// skipped
75+
//
76+
// NOTE this doesn't mean there actually are rows that evaluate to
77+
// true, but the pruning predicate can't prove there aren't any.
78+
true,
79+
// File 2: `x = 5 AND y = 10` can never evaluate to true because y
80+
// has only the value of 7. Thus this file can be skipped.
81+
false,
82+
// File 3: `x = 5 AND y = 10` can never evaluate to true because x
83+
// has the value `1`, and for any value of `y` the expression will
84+
// evaluate to false (`x = 5 AND y = 10 -->` false AND null` --> `false`). Thus this file can also be
85+
// skipped.
86+
false
87+
]
88+
);
89+
}
90+
91+
/// A simple model catalog that has information about the three files that store
92+
/// data for a table with two columns (x and y).
93+
struct MyCatalog {
94+
schema: SchemaRef,
95+
// (min, max) for x
96+
x_values: Vec<(Option<i32>, Option<i32>)>,
97+
// (min, max) for y
98+
y_values: Vec<(Option<i32>, Option<i32>)>,
99+
}
100+
impl MyCatalog {
101+
fn new() -> Self {
102+
MyCatalog {
103+
schema: Arc::new(Schema::new(vec![
104+
Field::new("x", DataType::Int32, false),
105+
Field::new("y", DataType::Int32, false),
106+
])),
107+
x_values: vec![
108+
// File 1: x has values between `4` and `6`
109+
(Some(4), Some(6)),
110+
// File 2: x has values between `4` and `6`
111+
(Some(4), Some(6)),
112+
// File 3: x has the value 1
113+
(Some(1), Some(1)),
114+
],
115+
y_values: vec![
116+
// File 1: y has the value 10
117+
(Some(10), Some(10)),
118+
// File 2: y has the value of `7`
119+
(Some(7), Some(7)),
120+
// File 3: nothing is known about the value of y. This is
121+
// represented as (None, None).
122+
//
123+
// Note, returning null means the value isn't known, NOT
124+
// that we know the entire column is null.
125+
(None, None),
126+
],
127+
}
128+
}
129+
}
130+
131+
/// We communicate the statistical information to DataFusion by implementing the
132+
/// PruningStatistics trait.
133+
impl PruningStatistics for MyCatalog {
134+
fn num_containers(&self) -> usize {
135+
// there are 3 files in this "catalog", and thus each array returned
136+
// from min_values and max_values also has 3 elements
137+
3
138+
}
139+
140+
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
141+
// The pruning predicate evaluates the bounds for multiple expressions
142+
// at once, so return an array with an element for the minimum value in
143+
// each file
144+
match column.name.as_str() {
145+
"x" => Some(i32_array(self.x_values.iter().map(|(min, _)| min))),
146+
"y" => Some(i32_array(self.y_values.iter().map(|(min, _)| min))),
147+
name => panic!("unknown column name: {name}"),
148+
}
149+
}
150+
151+
fn max_values(&self, column: &Column) -> Option<ArrayRef> {
152+
// similarly to min_values, return an array with an element for the
153+
// maximum value in each file
154+
match column.name.as_str() {
155+
"x" => Some(i32_array(self.x_values.iter().map(|(_, max)| max))),
156+
"y" => Some(i32_array(self.y_values.iter().map(|(_, max)| max))),
157+
name => panic!("unknown column name: {name}"),
158+
}
159+
}
160+
161+
fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
162+
// In this example, we know nothing about the number of nulls
163+
None
164+
}
165+
166+
fn contained(
167+
&self,
168+
_column: &Column,
169+
_values: &HashSet<ScalarValue>,
170+
) -> Option<BooleanArray> {
171+
// this method can be used to implement Bloom filter like filtering
172+
// but we do not illustrate that here
173+
None
174+
}
175+
}
176+
177+
fn create_pruning_predicate(expr: Expr, schema: &SchemaRef) -> PruningPredicate {
178+
let df_schema = DFSchema::try_from(schema.as_ref().clone()).unwrap();
179+
let props = ExecutionProps::new();
180+
let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap();
181+
PruningPredicate::try_new(physical_expr, schema.clone()).unwrap()
182+
}
183+
184+
fn i32_array<'a>(values: impl Iterator<Item = &'a Option<i32>>) -> ArrayRef {
185+
Arc::new(Int32Array::from_iter(values.cloned()))
186+
}

datafusion/common/src/scalar.rs

+8-12
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ use std::iter::repeat;
2727
use std::str::FromStr;
2828
use std::sync::Arc;
2929

30-
use crate::arrow_datafusion_err;
3130
use crate::cast::{
3231
as_decimal128_array, as_decimal256_array, as_dictionary_array,
3332
as_fixed_size_binary_array, as_fixed_size_list_array,
@@ -1639,18 +1638,16 @@ impl ScalarValue {
16391638
scale: i8,
16401639
size: usize,
16411640
) -> Result<Decimal128Array> {
1642-
match value {
1641+
Ok(match value {
16431642
Some(val) => Decimal128Array::from(vec![val; size])
1644-
.with_precision_and_scale(precision, scale)
1645-
.map_err(|e| arrow_datafusion_err!(e)),
1643+
.with_precision_and_scale(precision, scale)?,
16461644
None => {
16471645
let mut builder = Decimal128Array::builder(size)
1648-
.with_precision_and_scale(precision, scale)
1649-
.map_err(|e| arrow_datafusion_err!(e))?;
1646+
.with_precision_and_scale(precision, scale)?;
16501647
builder.append_nulls(size);
1651-
Ok(builder.finish())
1648+
builder.finish()
16521649
}
1653-
}
1650+
})
16541651
}
16551652

16561653
fn build_decimal256_array(
@@ -1659,11 +1656,10 @@ impl ScalarValue {
16591656
scale: i8,
16601657
size: usize,
16611658
) -> Result<Decimal256Array> {
1662-
std::iter::repeat(value)
1659+
Ok(std::iter::repeat(value)
16631660
.take(size)
16641661
.collect::<Decimal256Array>()
1665-
.with_precision_and_scale(precision, scale)
1666-
.map_err(|e| arrow_datafusion_err!(e))
1662+
.with_precision_and_scale(precision, scale)?)
16671663
}
16681664

16691665
/// Converts `Vec<ScalarValue>` where each element has type corresponding to
@@ -2053,7 +2049,7 @@ impl ScalarValue {
20532049

20542050
fn list_to_array_of_size(arr: &dyn Array, size: usize) -> Result<ArrayRef> {
20552051
let arrays = std::iter::repeat(arr).take(size).collect::<Vec<_>>();
2056-
arrow::compute::concat(arrays.as_slice()).map_err(|e| arrow_datafusion_err!(e))
2052+
Ok(arrow::compute::concat(arrays.as_slice())?)
20572053
}
20582054

20592055
/// Retrieve ScalarValue for each row in `array`

datafusion/core/src/physical_optimizer/pruning.rs

+8
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,8 @@ pub trait PruningStatistics {
136136
/// possibly evaluate to `true` given information about a column provided by
137137
/// [`PruningStatistics`].
138138
///
139+
/// # Introduction
140+
///
139141
/// `PruningPredicate` analyzes filter expressions using statistics such as
140142
/// min/max values and null counts, attempting to prove a "container" (e.g.
141143
/// Parquet Row Group) can be skipped without reading the actual data,
@@ -163,6 +165,12 @@ pub trait PruningStatistics {
163165
///
164166
/// # Example
165167
///
168+
/// See the [`pruning.rs` example in the `datafusion-examples`] for a complete
169+
/// example of how to use `PruningPredicate` to prune files based on min/max
170+
/// values.
171+
///
172+
/// [`pruning.rs` example in the `datafusion-examples`]: https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/pruning.rs
173+
///
166174
/// Given an expression like `x = 5` and statistics for 3 containers (Row
167175
/// Groups, files, etc) `A`, `B`, and `C`:
168176
///

datafusion/physical-plan/src/joins/sort_merge_join.rs

+22-1
Original file line numberDiff line numberDiff line change
@@ -1209,7 +1209,15 @@ impl SMJStream {
12091209
) {
12101210
// The reverse of the selection mask. For the rows not pass join filter above,
12111211
// we need to join them (left or right) with null rows for outer joins.
1212-
let not_mask = compute::not(mask)?;
1212+
let not_mask = if mask.null_count() > 0 {
1213+
// If the mask contains nulls, we need to use `prep_null_mask_filter` to
1214+
// handle the nulls in the mask as false to produce rows where the mask
1215+
// was null itself.
1216+
compute::not(&compute::prep_null_mask_filter(mask))?
1217+
} else {
1218+
compute::not(mask)?
1219+
};
1220+
12131221
let null_joined_batch =
12141222
compute::filter_record_batch(&output_batch, &not_mask)?;
12151223

@@ -1254,6 +1262,19 @@ impl SMJStream {
12541262

12551263
// For full join, we also need to output the null joined rows from the buffered side
12561264
if matches!(self.join_type, JoinType::Full) {
1265+
// Handle not mask for buffered side further.
1266+
// For buffered side, we want to output the rows that are not null joined with
1267+
// the streamed side. i.e. the rows that are not null in the `buffered_indices`.
1268+
let not_mask = if let Some(nulls) = buffered_indices.nulls() {
1269+
let mask = not_mask.values() & nulls.inner();
1270+
BooleanArray::new(mask, None)
1271+
} else {
1272+
not_mask
1273+
};
1274+
1275+
let null_joined_batch =
1276+
compute::filter_record_batch(&output_batch, &not_mask)?;
1277+
12571278
let mut streamed_columns = self
12581279
.streamed_schema
12591280
.fields()

0 commit comments

Comments
 (0)