Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make eval_sql_where available to DefaultPredicateEvaluator #627

Merged
merged 17 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions kernel/src/engine/parquet_row_group_skipping.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
//! An implementation of parquet row group skipping using data skipping predicates over footer stats.
use crate::predicates::parquet_stats_skipping::{
Copy link
Collaborator Author

@scovich scovich Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aside: Not sure how this out of order import escaped cargo fmt before now?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it's just flattening it and you moved the import anyways.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't cargo fmt order imports alphabetically? If so, how did use crate::predicates end up before use crate::expressions?

ParquetStatsProvider, ParquetStatsSkippingFilter as _,
};
use crate::expressions::{ColumnName, Expression, Scalar, UnaryExpression, BinaryExpression, VariadicExpression};
use crate::predicates::SqlPredicateEvaluator as _;
use crate::predicates::parquet_stats_skipping::ParquetStatsProvider;
use crate::schema::{DataType, PrimitiveType};
use chrono::{DateTime, Days};
use parquet::arrow::arrow_reader::ArrowReaderBuilder;
Expand Down
104 changes: 104 additions & 0 deletions kernel/src/predicates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,102 @@ pub(crate) trait PredicateEvaluator {
}
}

/// Support for SQL WHERE semantics in data skipping predicates that may produce a NULL result.
///
/// By default, [`apply_expr`] can produce unwelcome behavior for comparisons involving all-NULL
scovich marked this conversation as resolved.
Show resolved Hide resolved
/// columns (e.g. `a == 10`), because the (legitimately NULL) min/max stats are interpreted as
/// stats-missing that produces a NULL data skipping result). The resulting NULL can "poison"
/// the entire expression, causing it to return NULL instead of FALSE that would allow skipping.
///
/// Meanwhile, SQL WHERE semantics only keeps rows for which the filter evaluates to TRUE --
/// effectively turning `<expr>` into the null-safe predicate `AND(<expr> IS NOT NULL, <expr>)`.
///
/// We cannot safely evaluate an arbitrary data skipping predicate with null-safe semantics because
/// NULL could also mean e.g. unknown/missing stats. However, we CAN safely turn a column reference
/// in a comparison into a null-safe comparison, as long as the comparison's parent expressions are
/// all AND. To see why, consider a WHERE clause filter of the form:
///
/// ```text
/// AND(..., a {cmp} b, ...)
/// ```
///
/// In order allow skipping based on the all-null `a` or `b`, we want to actually evaluate:
/// ```text
/// AND(..., AND(a IS NOT NULL, b IS NOT NULL, a {cmp} b), ...)
/// ```
///
/// This optimization relies on the fact that we only support IS [NOT] NULL skipping for
/// columns, and we only support skipping for comparisons between columns and literals. Thus, a
/// typical case such as: `AND(..., x < 10, ...)` would in the all-null case be evaluated as:
/// ```text
/// AND(..., AND(x IS NOT NULL, 10 IS NOT NULL, x < 10), ...)
/// AND(..., AND(FALSE, NULL, NULL), ...)
/// AND(..., FALSE, ...)
/// FALSE
/// ```
///
/// In the not all-null case, it would instead evaluate as:
/// ```text
/// AND(..., AND(x IS NOT NULL, 10 IS NOT NULL, x < 10), ...)
/// AND(..., AND(TRUE, NULL, <result>), ...)
/// ```
///
/// If the result was FALSE, it forces both inner and outer AND to FALSE, as desired. If the
/// result was TRUE or NULL, then it does not contribute to data skipping but also does not
/// block it if other legs of the AND evaluate to FALSE.
pub(crate) trait SqlPredicateEvaluator: PredicateEvaluator<Output = bool> {
/// Attempts to filter using SQL WHERE semantics.
fn eval_sql_where(&self, filter: &Expr) -> Option<bool> {
use Expr::{Binary, Variadic};
match filter {
Variadic(VariadicExpression {
op: VariadicOperator::And,
exprs,
}) => {
let exprs: Vec<_> = exprs
.iter()
.map(|expr| self.eval_sql_where(expr))
.map(|result| match result {
Some(value) => Expr::literal(value),
None => Expr::null_literal(DataType::BOOLEAN),
})
.collect();
self.eval_variadic(VariadicOperator::And, &exprs, false)
}
Binary(BinaryExpression { op, left, right }) => {
self.eval_binary_nullsafe(*op, left, right)
}
_ => self.eval_expr(filter, false),
}
}

/// Helper method for [`apply_sql_where`], that evaluates `{a} {cmp} {b}` as
/// ```text
/// AND({a} IS NOT NULL, {b} IS NOT NULL, {a} {cmp} {b})
/// ```
///
/// The null checks only apply to column expressions, so at least one of them will always be
/// NULL (since we don't support skipping over column-column comparisons). If any NULL check
/// fails (producing FALSE), it short-circuits the entire AND without ever evaluating the
/// comparison. Otherwise, the original comparison will run and -- if FALSE -- can cause data
/// skipping as usual.
fn eval_binary_nullsafe(&self, op: BinaryOperator, left: &Expr, right: &Expr) -> Option<bool> {
use UnaryOperator::IsNull;
// Convert `a {cmp} b` to `AND(a IS NOT NULL, b IS NOT NULL, a {cmp} b)`,
// and only evaluate the comparison if the null checks don't short circuit.
if let Some(false) = self.eval_unary(IsNull, left, true) {
return Some(false);
}
if let Some(false) = self.eval_unary(IsNull, right, true) {
return Some(false);
}
self.eval_binary(op, left, right, false)
}
}

/// Blanket impl for all boolean-output predicate evaluators
impl<T: PredicateEvaluator<Output = bool>> SqlPredicateEvaluator for T {}

/// A collection of provided methods from the [`PredicateEvaluator`] trait, factored out to allow
/// reuse by the different predicate evaluator implementations.
pub(crate) struct PredicateEvaluatorDefaults;
Expand Down Expand Up @@ -326,6 +422,14 @@ impl ResolveColumnAsScalar for UnimplementedColumnResolver {
}
}

// Used internally and by some tests
pub(crate) struct EmptyColumnResolver;
impl ResolveColumnAsScalar for EmptyColumnResolver {
fn resolve_column(&self, _col: &ColumnName) -> Option<Scalar> {
None
}
}

// In testing, it is convenient to just build a hashmap of scalar values.
#[cfg(test)]
impl ResolveColumnAsScalar for std::collections::HashMap<ColumnName, Scalar> {
Expand Down
115 changes: 2 additions & 113 deletions kernel/src/predicates/parquet_stats_skipping.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
//! An implementation of data skipping that leverages parquet stats from the file footer.
use crate::expressions::{
BinaryExpression, BinaryOperator, ColumnName, Expression as Expr, Scalar, UnaryOperator,
VariadicExpression, VariadicOperator,
};
use crate::predicates::{
DataSkippingPredicateEvaluator, PredicateEvaluator, PredicateEvaluatorDefaults,
};
use crate::expressions::{BinaryOperator, ColumnName, Scalar, VariadicOperator};
use crate::predicates::{DataSkippingPredicateEvaluator, PredicateEvaluatorDefaults};
use crate::schema::DataType;
use std::cmp::Ordering;

Expand Down Expand Up @@ -96,109 +91,3 @@ impl<T: ParquetStatsProvider> DataSkippingPredicateEvaluator for T {
PredicateEvaluatorDefaults::finish_eval_variadic(op, exprs, inverted)
}
}

/// Data skipping based on parquet footer stats (e.g. row group skipping). The required methods
/// fetch stats values for requested columns (if available and with compatible types), and the
/// provided methods implement the actual skipping logic.
///
/// NOTE: We are given a row-based filter, but stats-based predicate evaluation -- which applies to
/// a SET of rows -- has different semantics than row-based predicate evaluation. The provided
/// methods of this class convert various supported expressions into data skipping predicates, and
/// then return the result of evaluating the translated filter.
pub(crate) trait ParquetStatsSkippingFilter {
/// Attempts to filter using SQL WHERE semantics.
///
/// By default, [`apply_expr`] can produce unwelcome behavior for comparisons involving all-NULL
/// columns (e.g. `a == 10`), because the (legitimately NULL) min/max stats are interpreted as
/// stats-missing that produces a NULL data skipping result). The resulting NULL can "poison"
/// the entire expression, causing it to return NULL instead of FALSE that would allow skipping.
///
/// Meanwhile, SQL WHERE semantics only keep rows for which the filter evaluates to TRUE --
/// effectively turning `<expr>` into the null-safe predicate `AND(<expr> IS NOT NULL, <expr>)`.
///
/// We cannot safely evaluate an arbitrary data skipping expression with null-safe semantics
/// (because NULL could also mean missing-stats), but we CAN safely turn a column reference in a
/// comparison into a null-safe comparison, as long as the comparison's parent expressions are
/// all AND. To see why, consider a WHERE clause filter of the form:
///
/// ```text
/// AND(..., a {cmp} b, ...)
/// ```
///
/// In order allow skipping based on the all-null `a` or `b`, we want to actually evaluate:
/// ```text
/// AND(..., AND(a IS NOT NULL, b IS NOT NULL, a {cmp} b), ...)
/// ```
///
/// This optimization relies on the fact that we only support IS [NOT] NULL skipping for
/// columns, and we only support skipping for comparisons between columns and literals. Thus, a
/// typical case such as: `AND(..., x < 10, ...)` would in the all-null case be evaluated as:
/// ```text
/// AND(..., AND(x IS NOT NULL, 10 IS NOT NULL, x < 10), ...)
/// AND(..., AND(FALSE, NULL, NULL), ...)
/// AND(..., FALSE, ...)
/// FALSE
/// ```
///
/// In the not all-null case, it would instead evaluate as:
/// ```text
/// AND(..., AND(x IS NOT NULL, 10 IS NOT NULL, x < 10), ...)
/// AND(..., AND(TRUE, NULL, <result>), ...)
/// ```
///
/// If the result was FALSE, it forces both inner and outer AND to FALSE, as desired. If the
/// result was TRUE or NULL, then it does not contribute to data skipping but also does not
/// block it if other legs of the AND evaluate to FALSE.
// TODO: If these are generally useful, we may want to move them into PredicateEvaluator?
fn eval_sql_where(&self, filter: &Expr) -> Option<bool>;
fn eval_binary_nullsafe(&self, op: BinaryOperator, left: &Expr, right: &Expr) -> Option<bool>;
}

impl<T: DataSkippingPredicateEvaluator<Output = bool>> ParquetStatsSkippingFilter for T {
fn eval_sql_where(&self, filter: &Expr) -> Option<bool> {
use Expr::{Binary, Variadic};
match filter {
Variadic(VariadicExpression {
op: VariadicOperator::And,
exprs,
}) => {
let exprs: Vec<_> = exprs
.iter()
.map(|expr| self.eval_sql_where(expr))
.map(|result| match result {
Some(value) => Expr::literal(value),
None => Expr::null_literal(DataType::BOOLEAN),
})
.collect();
self.eval_variadic(VariadicOperator::And, &exprs, false)
}
Binary(BinaryExpression { op, left, right }) => {
self.eval_binary_nullsafe(*op, left, right)
}
_ => self.eval_expr(filter, false),
}
}

/// Helper method for [`apply_sql_where`], that evaluates `{a} {cmp} {b}` as
/// ```text
/// AND({a} IS NOT NULL, {b} IS NOT NULL, {a} {cmp} {b})
/// ```
///
/// The null checks only apply to column expressions, so at least one of them will always be
/// NULL (since we don't support skipping over column-column comparisons). If any NULL check
/// fails (producing FALSE), it short-circuits the entire AND without ever evaluating the
/// comparison. Otherwise, the original comparison will run and -- if FALSE -- can cause data
/// skipping as usual.
fn eval_binary_nullsafe(&self, op: BinaryOperator, left: &Expr, right: &Expr) -> Option<bool> {
use UnaryOperator::IsNull;
// Convert `a {cmp} b` to `AND(a IS NOT NULL, b IS NOT NULL, a {cmp} b)`,
// and only evaluate the comparison if the null checks don't short circuit.
if let Some(false) = self.eval_unary(IsNull, left, true) {
return Some(false);
}
if let Some(false) = self.eval_unary(IsNull, right, true) {
return Some(false);
}
self.eval_binary(op, left, right, false)
}
}
125 changes: 1 addition & 124 deletions kernel/src/predicates/parquet_stats_skipping/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::*;
use crate::expressions::{column_expr, Expression as Expr};
use crate::predicates::PredicateEvaluator;
use crate::predicates::PredicateEvaluator as _;
use crate::DataType;

const TRUE: Option<bool> = Some(true);
Expand Down Expand Up @@ -257,126 +257,3 @@ fn test_eval_is_null() {
// all nulls
do_test(2, &[TRUE, FALSE]);
}

struct AllNullTestFilter;
impl ParquetStatsProvider for AllNullTestFilter {
fn get_parquet_min_stat(&self, _col: &ColumnName, _data_type: &DataType) -> Option<Scalar> {
None
}

fn get_parquet_max_stat(&self, _col: &ColumnName, _data_type: &DataType) -> Option<Scalar> {
None
}

fn get_parquet_nullcount_stat(&self, _col: &ColumnName) -> Option<i64> {
Some(self.get_parquet_rowcount_stat())
}

fn get_parquet_rowcount_stat(&self) -> i64 {
10
}
}

#[test]
fn test_sql_where() {
let col = &column_expr!("x");
const VAL: Expr = Expr::Literal(Scalar::Integer(1));
const NULL: Expr = Expr::Literal(Scalar::Null(DataType::BOOLEAN));
const FALSE: Expr = Expr::Literal(Scalar::Boolean(false));
const TRUE: Expr = Expr::Literal(Scalar::Boolean(true));

// Basic sanity checks
expect_eq!(AllNullTestFilter.eval_sql_where(&VAL), None, "WHERE {VAL}");
expect_eq!(AllNullTestFilter.eval_sql_where(col), None, "WHERE {col}");
expect_eq!(
AllNullTestFilter.eval_sql_where(&Expr::is_null(col.clone())),
Some(true), // No injected NULL checks
"WHERE {col} IS NULL"
);
expect_eq!(
AllNullTestFilter.eval_sql_where(&Expr::lt(TRUE, FALSE)),
Some(false), // Injected NULL checks don't short circuit when inputs are NOT NULL
"WHERE {TRUE} < {FALSE}"
);

// Constrast normal vs SQL WHERE semantics - comparison
expect_eq!(
AllNullTestFilter.eval_expr(&Expr::lt(col.clone(), VAL), false),
None,
"{col} < {VAL}"
);
expect_eq!(
AllNullTestFilter.eval_sql_where(&Expr::lt(col.clone(), VAL)),
Some(false),
"WHERE {col} < {VAL}"
);
expect_eq!(
AllNullTestFilter.eval_expr(&Expr::lt(VAL, col.clone()), false),
None,
"{VAL} < {col}"
);
expect_eq!(
AllNullTestFilter.eval_sql_where(&Expr::lt(VAL, col.clone())),
Some(false),
"WHERE {VAL} < {col}"
);

// Constrast normal vs SQL WHERE semantics - comparison inside AND
expect_eq!(
AllNullTestFilter.eval_expr(&Expr::and(NULL, Expr::lt(col.clone(), VAL)), false),
None,
"{NULL} AND {col} < {VAL}"
);
expect_eq!(
AllNullTestFilter.eval_sql_where(&Expr::and(NULL, Expr::lt(col.clone(), VAL),)),
Some(false),
"WHERE {NULL} AND {col} < {VAL}"
);

expect_eq!(
AllNullTestFilter.eval_expr(&Expr::and(TRUE, Expr::lt(col.clone(), VAL)), false),
None, // NULL (from the NULL check) is stronger than TRUE
"{TRUE} AND {col} < {VAL}"
);
expect_eq!(
AllNullTestFilter.eval_sql_where(&Expr::and(TRUE, Expr::lt(col.clone(), VAL),)),
Some(false), // FALSE (from the NULL check) is stronger than TRUE
"WHERE {TRUE} AND {col} < {VAL}"
);

// Contrast normal vs. SQL WHERE semantics - comparison inside AND inside AND
expect_eq!(
AllNullTestFilter.eval_expr(
&Expr::and(TRUE, Expr::and(NULL, Expr::lt(col.clone(), VAL)),),
false,
),
None,
"{TRUE} AND ({NULL} AND {col} < {VAL})"
);
expect_eq!(
AllNullTestFilter.eval_sql_where(&Expr::and(
TRUE,
Expr::and(NULL, Expr::lt(col.clone(), VAL)),
)),
Some(false),
"WHERE {TRUE} AND ({NULL} AND {col} < {VAL})"
);

// Semantics are the same for comparison inside OR inside AND
expect_eq!(
AllNullTestFilter.eval_expr(
&Expr::or(FALSE, Expr::and(NULL, Expr::lt(col.clone(), VAL)),),
false,
),
None,
"{FALSE} OR ({NULL} AND {col} < {VAL})"
);
expect_eq!(
AllNullTestFilter.eval_sql_where(&Expr::or(
FALSE,
Expr::and(NULL, Expr::lt(col.clone(), VAL)),
)),
None,
"WHERE {FALSE} OR ({NULL} AND {col} < {VAL})"
);
}
Loading
Loading