Skip to content

Commit ad5a04f

Browse files
authored
feat(optimizer): Enable filter pushdown on window functions (#14026)
* feat(optimizer): Enable filter pushdown on window functions Ensures selections can be pushed past window functions similarly to what is already done with aggregations, when possible. * fix: Add missing dependency * minor(optimizer): Use 'datafusion-functions-window' as a dev dependency * docs(optimizer): Add example to filter pushdown on LogicalPlan::Window
1 parent da4208b commit ad5a04f

File tree

3 files changed

+600
-4
lines changed

3 files changed

+600
-4
lines changed

datafusion/optimizer/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ regex-syntax = "0.8.0"
5555
async-trait = { workspace = true }
5656
ctor = { workspace = true }
5757
datafusion-functions-aggregate = { workspace = true }
58+
datafusion-functions-window = { workspace = true }
5859
datafusion-functions-window-common = { workspace = true }
5960
datafusion-sql = { workspace = true }
6061
env_logger = { workspace = true }

datafusion/optimizer/src/push_down_filter.rs

+306-4
Original file line numberDiff line numberDiff line change
@@ -985,6 +985,87 @@ impl OptimizerRule for PushDownFilter {
985985
}
986986
})
987987
}
988+
// Tries to push filters based on the partition key(s) of the window function(s) used.
989+
// Example:
990+
// Before:
991+
// Filter: (a > 1) and (b > 1) and (c > 1)
992+
// Window: func() PARTITION BY [a] ...
993+
// ---
994+
// After:
995+
// Filter: (b > 1) and (c > 1)
996+
// Window: func() PARTITION BY [a] ...
997+
// Filter: (a > 1)
998+
LogicalPlan::Window(window) => {
999+
// Retrieve the set of potential partition keys where we can push filters by.
1000+
// Unlike aggregations, where there is only one statement per SELECT, there can be
1001+
// multiple window functions, each with potentially different partition keys.
1002+
// Therefore, we need to ensure that any potential partition key returned is used in
1003+
// ALL window functions. Otherwise, filters cannot be pushed by through that column.
1004+
let potential_partition_keys = window
1005+
.window_expr
1006+
.iter()
1007+
.map(|e| {
1008+
if let Expr::WindowFunction(window_expression) = e {
1009+
window_expression
1010+
.partition_by
1011+
.iter()
1012+
.map(|c| {
1013+
Column::from_qualified_name(
1014+
c.schema_name().to_string(),
1015+
)
1016+
})
1017+
.collect::<HashSet<_>>()
1018+
} else {
1019+
// window functions expressions are only Expr::WindowFunction
1020+
unreachable!()
1021+
}
1022+
})
1023+
// performs the set intersection of the partition keys of all window functions,
1024+
// returning only the common ones
1025+
.reduce(|a, b| &a & &b)
1026+
.unwrap_or_default();
1027+
1028+
let predicates = split_conjunction_owned(filter.predicate);
1029+
let mut keep_predicates = vec![];
1030+
let mut push_predicates = vec![];
1031+
for expr in predicates {
1032+
let cols = expr.column_refs();
1033+
if cols.iter().all(|c| potential_partition_keys.contains(c)) {
1034+
push_predicates.push(expr);
1035+
} else {
1036+
keep_predicates.push(expr);
1037+
}
1038+
}
1039+
1040+
// Unlike with aggregations, there are no cases where we have to replace, e.g.,
1041+
// `a+b` with Column(a)+Column(b). This is because partition expressions are not
1042+
// available as standalone columns to the user. For example, while an aggregation on
1043+
// `a+b` becomes Column(a + b), in a window partition it becomes
1044+
// `func() PARTITION BY [a + b] ...`. Thus, filters on expressions always remain in
1045+
// place, so we can use `push_predicates` directly. This is consistent with other
1046+
// optimizers, such as the one used by Postgres.
1047+
1048+
let window_input = Arc::clone(&window.input);
1049+
Transformed::yes(LogicalPlan::Window(window))
1050+
.transform_data(|new_plan| {
1051+
// If we have a filter to push, we push it down to the input of the window
1052+
if let Some(predicate) = conjunction(push_predicates) {
1053+
let new_filter = make_filter(predicate, window_input)?;
1054+
insert_below(new_plan, new_filter)
1055+
} else {
1056+
Ok(Transformed::no(new_plan))
1057+
}
1058+
})?
1059+
.map_data(|child_plan| {
1060+
// if there are any remaining predicates we can't push, add them
1061+
// back as a filter
1062+
if let Some(predicate) = conjunction(keep_predicates) {
1063+
make_filter(predicate, Arc::new(child_plan))
1064+
} else {
1065+
Ok(child_plan)
1066+
}
1067+
})
1068+
}
9881069
LogicalPlan::Join(join) => push_down_join(join, Some(&filter.predicate)),
9891070
LogicalPlan::TableScan(scan) => {
9901071
let filter_predicates = split_conjunction(&filter.predicate);
@@ -1289,12 +1370,12 @@ mod tests {
12891370
use async_trait::async_trait;
12901371

12911372
use datafusion_common::{DFSchemaRef, ScalarValue};
1292-
use datafusion_expr::expr::ScalarFunction;
1373+
use datafusion_expr::expr::{ScalarFunction, WindowFunction};
12931374
use datafusion_expr::logical_plan::table_scan;
12941375
use datafusion_expr::{
1295-
col, in_list, in_subquery, lit, ColumnarValue, Extension, LogicalPlanBuilder,
1296-
ScalarUDF, ScalarUDFImpl, Signature, TableSource, TableType,
1297-
UserDefinedLogicalNodeCore, Volatility,
1376+
col, in_list, in_subquery, lit, ColumnarValue, ExprFunctionExt, Extension,
1377+
LogicalPlanBuilder, ScalarUDF, ScalarUDFImpl, Signature, TableSource, TableType,
1378+
UserDefinedLogicalNodeCore, Volatility, WindowFunctionDefinition,
12981379
};
12991380

13001381
use crate::optimizer::Optimizer;
@@ -1442,6 +1523,227 @@ mod tests {
14421523
assert_optimized_plan_eq(plan, expected)
14431524
}
14441525

1526+
/// verifies that when partitioning by 'a' and 'b', and filtering by 'b', 'b' is pushed
1527+
#[test]
1528+
fn filter_move_window() -> Result<()> {
1529+
let table_scan = test_table_scan()?;
1530+
1531+
let window = Expr::WindowFunction(WindowFunction::new(
1532+
WindowFunctionDefinition::WindowUDF(
1533+
datafusion_functions_window::rank::rank_udwf(),
1534+
),
1535+
vec![],
1536+
))
1537+
.partition_by(vec![col("a"), col("b")])
1538+
.order_by(vec![col("c").sort(true, true)])
1539+
.build()
1540+
.unwrap();
1541+
1542+
let plan = LogicalPlanBuilder::from(table_scan)
1543+
.window(vec![window])?
1544+
.filter(col("b").gt(lit(10i64)))?
1545+
.build()?;
1546+
1547+
let expected = "\
1548+
WindowAggr: windowExpr=[[rank() PARTITION BY [test.a, test.b] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
1549+
\n TableScan: test, full_filters=[test.b > Int64(10)]";
1550+
assert_optimized_plan_eq(plan, expected)
1551+
}
1552+
1553+
/// verifies that when partitioning by 'a' and 'b', and filtering by 'a' and 'b', both 'a' and
1554+
/// 'b' are pushed
1555+
#[test]
1556+
fn filter_move_complex_window() -> Result<()> {
1557+
let table_scan = test_table_scan()?;
1558+
1559+
let window = Expr::WindowFunction(WindowFunction::new(
1560+
WindowFunctionDefinition::WindowUDF(
1561+
datafusion_functions_window::rank::rank_udwf(),
1562+
),
1563+
vec![],
1564+
))
1565+
.partition_by(vec![col("a"), col("b")])
1566+
.order_by(vec![col("c").sort(true, true)])
1567+
.build()
1568+
.unwrap();
1569+
1570+
let plan = LogicalPlanBuilder::from(table_scan)
1571+
.window(vec![window])?
1572+
.filter(and(col("a").gt(lit(10i64)), col("b").eq(lit(1i64))))?
1573+
.build()?;
1574+
1575+
let expected = "\
1576+
WindowAggr: windowExpr=[[rank() PARTITION BY [test.a, test.b] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
1577+
\n TableScan: test, full_filters=[test.a > Int64(10), test.b = Int64(1)]";
1578+
assert_optimized_plan_eq(plan, expected)
1579+
}
1580+
1581+
/// verifies that when partitioning by 'a' and filtering by 'a' and 'b', only 'a' is pushed
1582+
#[test]
1583+
fn filter_move_partial_window() -> Result<()> {
1584+
let table_scan = test_table_scan()?;
1585+
1586+
let window = Expr::WindowFunction(WindowFunction::new(
1587+
WindowFunctionDefinition::WindowUDF(
1588+
datafusion_functions_window::rank::rank_udwf(),
1589+
),
1590+
vec![],
1591+
))
1592+
.partition_by(vec![col("a")])
1593+
.order_by(vec![col("c").sort(true, true)])
1594+
.build()
1595+
.unwrap();
1596+
1597+
let plan = LogicalPlanBuilder::from(table_scan)
1598+
.window(vec![window])?
1599+
.filter(and(col("a").gt(lit(10i64)), col("b").eq(lit(1i64))))?
1600+
.build()?;
1601+
1602+
let expected = "\
1603+
Filter: test.b = Int64(1)\
1604+
\n WindowAggr: windowExpr=[[rank() PARTITION BY [test.a] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
1605+
\n TableScan: test, full_filters=[test.a > Int64(10)]";
1606+
assert_optimized_plan_eq(plan, expected)
1607+
}
1608+
1609+
/// verifies that filters on partition expressions are not pushed, as the single expression
1610+
/// column is not available to the user, unlike with aggregations
1611+
#[test]
1612+
fn filter_expression_keep_window() -> Result<()> {
1613+
let table_scan = test_table_scan()?;
1614+
1615+
let window = Expr::WindowFunction(WindowFunction::new(
1616+
WindowFunctionDefinition::WindowUDF(
1617+
datafusion_functions_window::rank::rank_udwf(),
1618+
),
1619+
vec![],
1620+
))
1621+
.partition_by(vec![add(col("a"), col("b"))]) // PARTITION BY a + b
1622+
.order_by(vec![col("c").sort(true, true)])
1623+
.build()
1624+
.unwrap();
1625+
1626+
let plan = LogicalPlanBuilder::from(table_scan)
1627+
.window(vec![window])?
1628+
// unlike with aggregations, single partition column "test.a + test.b" is not available
1629+
// to the plan, so we use multiple columns when filtering
1630+
.filter(add(col("a"), col("b")).gt(lit(10i64)))?
1631+
.build()?;
1632+
1633+
let expected = "\
1634+
Filter: test.a + test.b > Int64(10)\
1635+
\n WindowAggr: windowExpr=[[rank() PARTITION BY [test.a + test.b] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
1636+
\n TableScan: test";
1637+
assert_optimized_plan_eq(plan, expected)
1638+
}
1639+
1640+
/// verifies that filters are not pushed on order by columns (that are not used in partitioning)
1641+
#[test]
1642+
fn filter_order_keep_window() -> Result<()> {
1643+
let table_scan = test_table_scan()?;
1644+
1645+
let window = Expr::WindowFunction(WindowFunction::new(
1646+
WindowFunctionDefinition::WindowUDF(
1647+
datafusion_functions_window::rank::rank_udwf(),
1648+
),
1649+
vec![],
1650+
))
1651+
.partition_by(vec![col("a")])
1652+
.order_by(vec![col("c").sort(true, true)])
1653+
.build()
1654+
.unwrap();
1655+
1656+
let plan = LogicalPlanBuilder::from(table_scan)
1657+
.window(vec![window])?
1658+
.filter(col("c").gt(lit(10i64)))?
1659+
.build()?;
1660+
1661+
let expected = "\
1662+
Filter: test.c > Int64(10)\
1663+
\n WindowAggr: windowExpr=[[rank() PARTITION BY [test.a] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
1664+
\n TableScan: test";
1665+
assert_optimized_plan_eq(plan, expected)
1666+
}
1667+
1668+
/// verifies that when we use multiple window functions with a common partition key, the filter
1669+
/// on that key is pushed
1670+
#[test]
1671+
fn filter_multiple_windows_common_partitions() -> Result<()> {
1672+
let table_scan = test_table_scan()?;
1673+
1674+
let window1 = Expr::WindowFunction(WindowFunction::new(
1675+
WindowFunctionDefinition::WindowUDF(
1676+
datafusion_functions_window::rank::rank_udwf(),
1677+
),
1678+
vec![],
1679+
))
1680+
.partition_by(vec![col("a")])
1681+
.order_by(vec![col("c").sort(true, true)])
1682+
.build()
1683+
.unwrap();
1684+
1685+
let window2 = Expr::WindowFunction(WindowFunction::new(
1686+
WindowFunctionDefinition::WindowUDF(
1687+
datafusion_functions_window::rank::rank_udwf(),
1688+
),
1689+
vec![],
1690+
))
1691+
.partition_by(vec![col("b"), col("a")])
1692+
.order_by(vec![col("c").sort(true, true)])
1693+
.build()
1694+
.unwrap();
1695+
1696+
let plan = LogicalPlanBuilder::from(table_scan)
1697+
.window(vec![window1, window2])?
1698+
.filter(col("a").gt(lit(10i64)))? // a appears in both window functions
1699+
.build()?;
1700+
1701+
let expected = "\
1702+
WindowAggr: windowExpr=[[rank() PARTITION BY [test.a] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, rank() PARTITION BY [test.b, test.a] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
1703+
\n TableScan: test, full_filters=[test.a > Int64(10)]";
1704+
assert_optimized_plan_eq(plan, expected)
1705+
}
1706+
1707+
/// verifies that when we use multiple window functions with different partitions keys, the
1708+
/// filter cannot be pushed
1709+
#[test]
1710+
fn filter_multiple_windows_disjoint_partitions() -> Result<()> {
1711+
let table_scan = test_table_scan()?;
1712+
1713+
let window1 = Expr::WindowFunction(WindowFunction::new(
1714+
WindowFunctionDefinition::WindowUDF(
1715+
datafusion_functions_window::rank::rank_udwf(),
1716+
),
1717+
vec![],
1718+
))
1719+
.partition_by(vec![col("a")])
1720+
.order_by(vec![col("c").sort(true, true)])
1721+
.build()
1722+
.unwrap();
1723+
1724+
let window2 = Expr::WindowFunction(WindowFunction::new(
1725+
WindowFunctionDefinition::WindowUDF(
1726+
datafusion_functions_window::rank::rank_udwf(),
1727+
),
1728+
vec![],
1729+
))
1730+
.partition_by(vec![col("b"), col("a")])
1731+
.order_by(vec![col("c").sort(true, true)])
1732+
.build()
1733+
.unwrap();
1734+
1735+
let plan = LogicalPlanBuilder::from(table_scan)
1736+
.window(vec![window1, window2])?
1737+
.filter(col("b").gt(lit(10i64)))? // b only appears in one window function
1738+
.build()?;
1739+
1740+
let expected = "\
1741+
Filter: test.b > Int64(10)\
1742+
\n WindowAggr: windowExpr=[[rank() PARTITION BY [test.a] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, rank() PARTITION BY [test.b, test.a] ORDER BY [test.c ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
1743+
\n TableScan: test";
1744+
assert_optimized_plan_eq(plan, expected)
1745+
}
1746+
14451747
/// verifies that a filter is pushed to before a projection, the filter expression is correctly re-written
14461748
#[test]
14471749
fn alias() -> Result<()> {

0 commit comments

Comments
 (0)