diff --git a/src/query/service/src/pipelines/builders/builder_window.rs b/src/query/service/src/pipelines/builders/builder_window.rs index 41043661a364..b33f932f6f69 100644 --- a/src/query/service/src/pipelines/builders/builder_window.rs +++ b/src/query/service/src/pipelines/builders/builder_window.rs @@ -56,22 +56,6 @@ impl PipelineBuilder { .collect::>>()?; let old_output_len = self.main_pipeline.output_len(); - if !partition_by.is_empty() || !order_by.is_empty() { - let mut sort_desc = Vec::with_capacity(partition_by.len() + order_by.len()); - - for offset in &partition_by { - sort_desc.push(SortColumnDescription { - offset: *offset, - asc: true, - nulls_first: true, - is_nullable: input_schema.field(*offset).is_nullable(), // This information is not needed here. - }) - } - - sort_desc.extend(order_by.clone()); - - self.build_sort_pipeline(input_schema.clone(), sort_desc, window.limit, None)?; - } // `TransformWindow` is a pipeline breaker. self.main_pipeline.try_resize(1)?; let func = WindowFunctionInfo::try_create(&window.func, &input_schema)?; diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs index a0628527852e..f8016c9e3ff1 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs @@ -233,7 +233,7 @@ impl Debug for AggregateMeta f.debug_struct("AggregateMeta::Serialized").finish() } AggregateMeta::Spilling(_) => f.debug_struct("Aggregate::Spilling").finish(), - AggregateMeta::Spilled(_) => f.debug_struct("Aggregate::Spilling").finish(), + AggregateMeta::Spilled(_) => f.debug_struct("Aggregate::Spilled").finish(), AggregateMeta::BucketSpilled(_) => f.debug_struct("Aggregate::BucketSpilled").finish(), AggregateMeta::AggregatePayload(_) => { f.debug_struct("AggregateMeta:AggregatePayload").finish() diff --git a/src/query/sql/src/executor/physical_plans/physical_window.rs b/src/query/sql/src/executor/physical_plans/physical_window.rs index fccf386c873a..598a86225329 100644 --- a/src/query/sql/src/executor/physical_plans/physical_window.rs +++ b/src/query/sql/src/executor/physical_plans/physical_window.rs @@ -36,7 +36,6 @@ use crate::executor::physical_plans::common::SortDesc; use crate::executor::PhysicalPlan; use crate::executor::PhysicalPlanBuilder; use crate::optimizer::SExpr; -use crate::plans::ScalarItem; use crate::plans::WindowFuncFrame; use crate::plans::WindowFuncFrameBound; use crate::plans::WindowFuncType; @@ -152,7 +151,7 @@ impl PhysicalPlanBuilder { s_expr: &SExpr, window: &crate::plans::Window, mut required: ColumnSet, - stat_info: PlanStatsInfo, + _stat_info: PlanStatsInfo, ) -> Result { // 1. DO NOT Prune unused Columns cause window may not in required, eg: // select s1.a from ( select t1.a as a, dense_rank() over(order by t1.a desc) as rk @@ -175,37 +174,12 @@ impl PhysicalPlanBuilder { required.insert(item.order_by_item.index); }); - let column_projections = required.clone().into_iter().collect::>(); - // 2. Build physical plan. let input = self.build(s_expr.child(0)?, required).await?; let mut w = window.clone(); - // Generate a `EvalScalar` as the input of `Window`. - let mut scalar_items: Vec = Vec::new(); - for arg in &w.arguments { - scalar_items.push(arg.clone()); - } - for part in &w.partition_by { - scalar_items.push(part.clone()); - } - for order in &w.order_by { - scalar_items.push(order.order_by_item.clone()) - } - let input = if !scalar_items.is_empty() { - self.create_eval_scalar( - &crate::planner::plans::EvalScalar { - items: scalar_items, - }, - column_projections, - input, - stat_info.clone(), - )? - } else { - input - }; + let input_schema = input.output_schema()?; - // Unify the data type for range frame. if w.frame.units.is_range() && w.order_by.len() == 1 { let order_by = &mut w.order_by[0].order_by_item.scalar; diff --git a/src/query/sql/src/planner/binder/window.rs b/src/query/sql/src/planner/binder/window.rs index d44cfb8bb988..2e810557391e 100644 --- a/src/query/sql/src/planner/binder/window.rs +++ b/src/query/sql/src/planner/binder/window.rs @@ -28,10 +28,13 @@ use crate::optimizer::SExpr; use crate::plans::walk_expr_mut; use crate::plans::AggregateFunction; use crate::plans::BoundColumnRef; +use crate::plans::EvalScalar; use crate::plans::LagLeadFunction; use crate::plans::NthValueFunction; use crate::plans::ScalarExpr; use crate::plans::ScalarItem; +use crate::plans::Sort; +use crate::plans::SortItem; use crate::plans::SubqueryExpr; use crate::plans::VisitorMut; use crate::plans::Window; @@ -63,6 +66,66 @@ impl Binder { limit: None, }; + // eval scalars before sort + // Generate a `EvalScalar` as the input of `Window`. + let mut scalar_items: Vec = Vec::new(); + for arg in &window_plan.arguments { + scalar_items.push(arg.clone()); + } + for part in &window_plan.partition_by { + scalar_items.push(part.clone()); + } + for order in &window_plan.order_by { + scalar_items.push(order.order_by_item.clone()) + } + + let child = if !scalar_items.is_empty() { + let eval_scalar_plan = EvalScalar { + items: scalar_items, + }; + SExpr::create_unary(Arc::new(eval_scalar_plan.into()), Arc::new(child)) + } else { + child + }; + + let default_nulls_first = !self + .ctx + .get_settings() + .get_sql_dialect() + .unwrap() + .is_null_biggest(); + + let mut sort_items: Vec = vec![]; + if !window_plan.partition_by.is_empty() { + for part in window_plan.partition_by.iter() { + sort_items.push(SortItem { + index: part.index, + asc: true, + nulls_first: default_nulls_first, + }); + } + } + + for order in window_plan.order_by.iter() { + sort_items.push(SortItem { + index: order.order_by_item.index, + asc: order.asc.unwrap_or(true), + nulls_first: order.nulls_first.unwrap_or(default_nulls_first), + }); + } + + let child = if !sort_items.is_empty() { + let sort_plan = Sort { + items: sort_items, + limit: window_plan.limit, + after_exchange: None, + pre_projection: None, + }; + SExpr::create_unary(Arc::new(sort_plan.into()), Arc::new(child)) + } else { + child + }; + Ok(SExpr::create_unary( Arc::new(window_plan.into()), Arc::new(child), diff --git a/src/query/sql/src/planner/optimizer/distributed/sort_and_limit.rs b/src/query/sql/src/planner/optimizer/distributed/sort_and_limit.rs index 2dc99aac4034..4042d921e7bc 100644 --- a/src/query/sql/src/planner/optimizer/distributed/sort_and_limit.rs +++ b/src/query/sql/src/planner/optimizer/distributed/sort_and_limit.rs @@ -108,8 +108,9 @@ impl SortAndLimitPushDownOptimizer { let exchange_sexpr = s_expr.child(0)?; debug_assert!(matches!( exchange_sexpr.plan.as_ref(), - RelOperator::Exchange(Exchange::Merge) + RelOperator::Exchange(Exchange::Merge) | RelOperator::Exchange(Exchange::MergeSort) )); + debug_assert!(exchange_sexpr.children.len() == 1); let exchange_sexpr = exchange_sexpr.replace_plan(Arc::new(Exchange::MergeSort.into())); diff --git a/tests/sqllogictests/suites/mode/cluster/window.test b/tests/sqllogictests/suites/mode/cluster/window.test index 03aa3b612638..bd91c3872158 100644 --- a/tests/sqllogictests/suites/mode/cluster/window.test +++ b/tests/sqllogictests/suites/mode/cluster/window.test @@ -81,39 +81,47 @@ Window ├── partition by: [department_id] ├── order by: [salary] ├── frame: [Range: Preceding(None) ~ CurrentRow] -└── Exchange +└── Sort ├── output columns: [e.name (#1), e.salary (#3), d.department_name (#5), d.department_id (#4)] - ├── exchange type: Merge - └── HashJoin - ├── output columns: [e.name (#1), e.salary (#3), d.department_name (#5), d.department_id (#4)] - ├── join type: INNER - ├── build keys: [d.department_id (#4)] - ├── probe keys: [e.department_id (#2)] - ├── filters: [] - ├── estimated rows: 8.00 - ├── Exchange(Build) - │ ├── output columns: [d.department_id (#4), d.department_name (#5)] - │ ├── exchange type: Broadcast - │ └── TableScan - │ ├── table: default.default.departments - │ ├── output columns: [department_id (#4), department_name (#5)] - │ ├── read rows: 4 - │ ├── read size: < 1 KiB - │ ├── partitions total: 1 - │ ├── partitions scanned: 1 - │ ├── pruning stats: [segments: , blocks: ] - │ ├── push downs: [filters: [], limit: NONE] - │ └── estimated rows: 4.00 - └── TableScan(Probe) - ├── table: default.default.employees - ├── output columns: [name (#1), department_id (#2), salary (#3)] - ├── read rows: 10 - ├── read size: < 1 KiB - ├── partitions total: 1 - ├── partitions scanned: 1 - ├── pruning stats: [segments: , blocks: ] - ├── push downs: [filters: [], limit: NONE] - └── estimated rows: 10.00 + ├── sort keys: [department_id ASC NULLS LAST, salary DESC NULLS LAST] + ├── estimated rows: 8.00 + └── Exchange + ├── output columns: [e.name (#1), e.salary (#3), d.department_name (#5), d.department_id (#4), #_order_col] + ├── exchange type: Merge + └── Sort + ├── output columns: [e.name (#1), e.salary (#3), d.department_name (#5), d.department_id (#4), #_order_col] + ├── sort keys: [department_id ASC NULLS LAST, salary DESC NULLS LAST] + ├── estimated rows: 8.00 + └── HashJoin + ├── output columns: [e.name (#1), e.salary (#3), d.department_name (#5), d.department_id (#4)] + ├── join type: INNER + ├── build keys: [d.department_id (#4)] + ├── probe keys: [e.department_id (#2)] + ├── filters: [] + ├── estimated rows: 8.00 + ├── Exchange(Build) + │ ├── output columns: [d.department_id (#4), d.department_name (#5)] + │ ├── exchange type: Broadcast + │ └── TableScan + │ ├── table: default.default.departments + │ ├── output columns: [department_id (#4), department_name (#5)] + │ ├── read rows: 4 + │ ├── read size: < 1 KiB + │ ├── partitions total: 1 + │ ├── partitions scanned: 1 + │ ├── pruning stats: [segments: , blocks: ] + │ ├── push downs: [filters: [], limit: NONE] + │ └── estimated rows: 4.00 + └── TableScan(Probe) + ├── table: default.default.employees + ├── output columns: [name (#1), department_id (#2), salary (#3)] + ├── read rows: 10 + ├── read size: < 1 KiB + ├── partitions total: 1 + ├── partitions scanned: 1 + ├── pruning stats: [segments: , blocks: ] + ├── push downs: [filters: [], limit: NONE] + └── estimated rows: 10.00 statement ok DROP TABLE employees; diff --git a/tests/sqllogictests/suites/mode/standalone/explain/window.test b/tests/sqllogictests/suites/mode/standalone/explain/window.test index a2fdafddcaca..2fdefccd6b07 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/window.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/window.test @@ -13,16 +13,16 @@ CREATE TABLE empsalary (depname string, empno bigint, salary int, enroll_date da query T explain SELECT depname, empno, salary, sum(salary) OVER (PARTITION BY depname ORDER BY empno) FROM empsalary ORDER BY depname, empno ---- -Sort +Window ├── output columns: [empsalary.depname (#0), empsalary.empno (#1), empsalary.salary (#2), sum(salary) OVER ( PARTITION BY depname ORDER BY empno ) (#4)] -├── sort keys: [depname ASC NULLS LAST, empno ASC NULLS LAST] -├── estimated rows: 0.00 -└── Window - ├── output columns: [empsalary.depname (#0), empsalary.empno (#1), empsalary.salary (#2), sum(salary) OVER ( PARTITION BY depname ORDER BY empno ) (#4)] - ├── aggregate function: [sum(salary)] - ├── partition by: [depname] - ├── order by: [empno] - ├── frame: [Range: Preceding(None) ~ CurrentRow] +├── aggregate function: [sum(salary)] +├── partition by: [depname] +├── order by: [empno] +├── frame: [Range: Preceding(None) ~ CurrentRow] +└── Sort + ├── output columns: [empsalary.depname (#0), empsalary.empno (#1), empsalary.salary (#2)] + ├── sort keys: [depname ASC NULLS LAST, empno ASC NULLS LAST] + ├── estimated rows: 0.00 └── TableScan ├── table: default.test_explain_window.empsalary ├── output columns: [depname (#0), empno (#1), salary (#2)] @@ -44,17 +44,13 @@ query T explain pipeline SELECT depname, empno, salary, sum(salary) OVER (PARTITION BY depname ORDER BY empno) FROM empsalary ORDER BY depname, empno; ---- CompoundBlockOperator(Project) × 1 processor - Merge (TransformSortMerge × 4 processors) to (CompoundBlockOperator(Project) × 1) - TransformSortMerge × 4 processors - SortPartialTransform × 4 processors - Merge (Transform Window × 1 processor) to (SortPartialTransform × 4) - Transform Window × 1 processor - Merge (TransformSortMerge × 4 processors) to (Transform Window × 1) - TransformSortMerge × 4 processors - SortPartialTransform × 4 processors - Merge (DeserializeDataTransform × 1 processor) to (SortPartialTransform × 4) - DeserializeDataTransform × 1 processor - SyncReadParquetDataSource × 1 processor + Transform Window × 1 processor + Merge (TransformSortMerge × 4 processors) to (Transform Window × 1) + TransformSortMerge × 4 processors + SortPartialTransform × 4 processors + Merge (DeserializeDataTransform × 1 processor) to (SortPartialTransform × 4) + DeserializeDataTransform × 1 processor + SyncReadParquetDataSource × 1 processor # Enable sort spilling @@ -65,19 +61,14 @@ query T explain pipeline SELECT depname, empno, salary, sum(salary) OVER (PARTITION BY depname ORDER BY empno) FROM empsalary ORDER BY depname, empno; ---- CompoundBlockOperator(Project) × 1 processor - Merge (TransformSortSpill × 4 processors) to (CompoundBlockOperator(Project) × 1) - TransformSortSpill × 4 processors - TransformSortMerge × 4 processors - SortPartialTransform × 4 processors - Merge (Transform Window × 1 processor) to (SortPartialTransform × 4) - Transform Window × 1 processor - Merge (TransformSortSpill × 4 processors) to (Transform Window × 1) - TransformSortSpill × 4 processors - TransformSortMerge × 4 processors - SortPartialTransform × 4 processors - Merge (DeserializeDataTransform × 1 processor) to (SortPartialTransform × 4) - DeserializeDataTransform × 1 processor - SyncReadParquetDataSource × 1 processor + Transform Window × 1 processor + Merge (TransformSortSpill × 4 processors) to (Transform Window × 1) + TransformSortSpill × 4 processors + TransformSortMerge × 4 processors + SortPartialTransform × 4 processors + Merge (DeserializeDataTransform × 1 processor) to (SortPartialTransform × 4) + DeserializeDataTransform × 1 processor + SyncReadParquetDataSource × 1 processor statement ok @@ -100,35 +91,39 @@ Filter ├── partition by: [k] ├── order by: [v] ├── frame: [Range: Preceding(None) ~ CurrentRow] - └── UnionAll + └── Sort ├── output columns: [test.k (#0), test.v (#1)] + ├── sort keys: [k ASC NULLS LAST, v DESC NULLS LAST] ├── estimated rows: 0.00 - ├── Filter - │ ├── output columns: [test.k (#0), test.v (#1)] - │ ├── filters: [is_true(test.k (#0) = 12)] - │ ├── estimated rows: 0.00 - │ └── TableScan - │ ├── table: default.test_explain_window.test - │ ├── output columns: [k (#0), v (#1)] - │ ├── read rows: 0 - │ ├── read size: 0 - │ ├── partitions total: 0 - │ ├── partitions scanned: 0 - │ ├── push downs: [filters: [is_true(test.k (#0) = 12)], limit: NONE] - │ └── estimated rows: 0.00 - └── Filter - ├── output columns: [test.k (#2), test.v (#3)] - ├── filters: [is_true(test.k (#2) = 12)] + └── UnionAll + ├── output columns: [test.k (#0), test.v (#1)] ├── estimated rows: 0.00 - └── TableScan - ├── table: default.test_explain_window.test - ├── output columns: [k (#2), v (#3)] - ├── read rows: 0 - ├── read size: 0 - ├── partitions total: 0 - ├── partitions scanned: 0 - ├── push downs: [filters: [is_true(test.k (#2) = 12)], limit: NONE] - └── estimated rows: 0.00 + ├── Filter + │ ├── output columns: [test.k (#0), test.v (#1)] + │ ├── filters: [is_true(test.k (#0) = 12)] + │ ├── estimated rows: 0.00 + │ └── TableScan + │ ├── table: default.test_explain_window.test + │ ├── output columns: [k (#0), v (#1)] + │ ├── read rows: 0 + │ ├── read size: 0 + │ ├── partitions total: 0 + │ ├── partitions scanned: 0 + │ ├── push downs: [filters: [is_true(test.k (#0) = 12)], limit: NONE] + │ └── estimated rows: 0.00 + └── Filter + ├── output columns: [test.k (#2), test.v (#3)] + ├── filters: [is_true(test.k (#2) = 12)] + ├── estimated rows: 0.00 + └── TableScan + ├── table: default.test_explain_window.test + ├── output columns: [k (#2), v (#3)] + ├── read rows: 0 + ├── read size: 0 + ├── partitions total: 0 + ├── partitions scanned: 0 + ├── push downs: [filters: [is_true(test.k (#2) = 12)], limit: NONE] + └── estimated rows: 0.00 # cannot push down filter in window function query T @@ -144,27 +139,31 @@ Filter ├── partition by: [v] ├── order by: [v] ├── frame: [Range: Preceding(None) ~ CurrentRow] - └── UnionAll + └── Sort ├── output columns: [test.k (#0), test.v (#1)] + ├── sort keys: [v ASC NULLS LAST, v DESC NULLS LAST] ├── estimated rows: 0.00 - ├── TableScan - │ ├── table: default.test_explain_window.test - │ ├── output columns: [k (#0), v (#1)] - │ ├── read rows: 0 - │ ├── read size: 0 - │ ├── partitions total: 0 - │ ├── partitions scanned: 0 - │ ├── push downs: [filters: [], limit: NONE] - │ └── estimated rows: 0.00 - └── TableScan - ├── table: default.test_explain_window.test - ├── output columns: [k (#2), v (#3)] - ├── read rows: 0 - ├── read size: 0 - ├── partitions total: 0 - ├── partitions scanned: 0 - ├── push downs: [filters: [], limit: NONE] - └── estimated rows: 0.00 + └── UnionAll + ├── output columns: [test.k (#0), test.v (#1)] + ├── estimated rows: 0.00 + ├── TableScan + │ ├── table: default.test_explain_window.test + │ ├── output columns: [k (#0), v (#1)] + │ ├── read rows: 0 + │ ├── read size: 0 + │ ├── partitions total: 0 + │ ├── partitions scanned: 0 + │ ├── push downs: [filters: [], limit: NONE] + │ └── estimated rows: 0.00 + └── TableScan + ├── table: default.test_explain_window.test + ├── output columns: [k (#2), v (#3)] + ├── read rows: 0 + ├── read size: 0 + ├── partitions total: 0 + ├── partitions scanned: 0 + ├── push downs: [filters: [], limit: NONE] + └── estimated rows: 0.00 # cannot push down filter in window function query T @@ -180,27 +179,31 @@ Filter ├── partition by: [] ├── order by: [v] ├── frame: [Range: Preceding(None) ~ CurrentRow] - └── UnionAll + └── Sort ├── output columns: [test.k (#0), test.v (#1)] + ├── sort keys: [v DESC NULLS LAST] ├── estimated rows: 0.00 - ├── TableScan - │ ├── table: default.test_explain_window.test - │ ├── output columns: [k (#0), v (#1)] - │ ├── read rows: 0 - │ ├── read size: 0 - │ ├── partitions total: 0 - │ ├── partitions scanned: 0 - │ ├── push downs: [filters: [], limit: NONE] - │ └── estimated rows: 0.00 - └── TableScan - ├── table: default.test_explain_window.test - ├── output columns: [k (#2), v (#3)] - ├── read rows: 0 - ├── read size: 0 - ├── partitions total: 0 - ├── partitions scanned: 0 - ├── push downs: [filters: [], limit: NONE] - └── estimated rows: 0.00 + └── UnionAll + ├── output columns: [test.k (#0), test.v (#1)] + ├── estimated rows: 0.00 + ├── TableScan + │ ├── table: default.test_explain_window.test + │ ├── output columns: [k (#0), v (#1)] + │ ├── read rows: 0 + │ ├── read size: 0 + │ ├── partitions total: 0 + │ ├── partitions scanned: 0 + │ ├── push downs: [filters: [], limit: NONE] + │ └── estimated rows: 0.00 + └── TableScan + ├── table: default.test_explain_window.test + ├── output columns: [k (#2), v (#3)] + ├── read rows: 0 + ├── read size: 0 + ├── partitions total: 0 + ├── partitions scanned: 0 + ├── push downs: [filters: [], limit: NONE] + └── estimated rows: 0.00 statement ok drop table if exists t @@ -221,15 +224,19 @@ Filter ├── partition by: [a] ├── order by: [] ├── frame: [Range: Preceding(None) ~ Following(None)] - └── TableScan - ├── table: default.test_explain_window.t - ├── output columns: [a (#0)] - ├── read rows: 0 - ├── read size: 0 - ├── partitions total: 0 - ├── partitions scanned: 0 - ├── push downs: [filters: [], limit: NONE] - └── estimated rows: 0.00 + └── Sort + ├── output columns: [t.a (#0)] + ├── sort keys: [a ASC NULLS LAST] + ├── estimated rows: 0.00 + └── TableScan + ├── table: default.test_explain_window.t + ├── output columns: [a (#0)] + ├── read rows: 0 + ├── read size: 0 + ├── partitions total: 0 + ├── partitions scanned: 0 + ├── push downs: [filters: [], limit: NONE] + └── estimated rows: 0.00 ## example from: https://community.snowflake.com/s/article/Pushdown-or-Not-Pushdown statement ok @@ -253,19 +260,23 @@ Window ├── partition by: [] ├── order by: [b] ├── frame: [Range: Preceding(None) ~ CurrentRow] -└── Filter +└── Sort ├── output columns: [tbpush.b (#0)] - ├── filters: [is_true(tbpush.b (#0) > 3)] + ├── sort keys: [b ASC NULLS LAST] ├── estimated rows: 0.00 - └── TableScan - ├── table: default.test_explain_window.tbpush - ├── output columns: [b (#0)] - ├── read rows: 0 - ├── read size: 0 - ├── partitions total: 0 - ├── partitions scanned: 0 - ├── push downs: [filters: [is_true(tbpush.b (#0) > 3)], limit: NONE] - └── estimated rows: 0.00 + └── Filter + ├── output columns: [tbpush.b (#0)] + ├── filters: [is_true(tbpush.b (#0) > 3)] + ├── estimated rows: 0.00 + └── TableScan + ├── table: default.test_explain_window.tbpush + ├── output columns: [b (#0)] + ├── read rows: 0 + ├── read size: 0 + ├── partitions total: 0 + ├── partitions scanned: 0 + ├── push downs: [filters: [is_true(tbpush.b (#0) > 3)], limit: NONE] + └── estimated rows: 0.00 query T explain select * from vwpush where b > 3; @@ -280,15 +291,19 @@ Filter ├── partition by: [] ├── order by: [b] ├── frame: [Range: Preceding(None) ~ CurrentRow] - └── TableScan - ├── table: default.test_explain_window.tbpush - ├── output columns: [b (#0)] - ├── read rows: 0 - ├── read size: 0 - ├── partitions total: 0 - ├── partitions scanned: 0 - ├── push downs: [filters: [], limit: NONE] - └── estimated rows: 0.00 + └── Sort + ├── output columns: [tbpush.b (#0)] + ├── sort keys: [b ASC NULLS LAST] + ├── estimated rows: 0.00 + └── TableScan + ├── table: default.test_explain_window.tbpush + ├── output columns: [b (#0)] + ├── read rows: 0 + ├── read size: 0 + ├── partitions total: 0 + ├── partitions scanned: 0 + ├── push downs: [filters: [], limit: NONE] + └── estimated rows: 0.00 query T explain select * from (select b, row_number() over (order by b) from tbpush) where b > 3; @@ -303,15 +318,19 @@ Filter ├── partition by: [] ├── order by: [b] ├── frame: [Range: Preceding(None) ~ CurrentRow] - └── TableScan - ├── table: default.test_explain_window.tbpush - ├── output columns: [b (#0)] - ├── read rows: 0 - ├── read size: 0 - ├── partitions total: 0 - ├── partitions scanned: 0 - ├── push downs: [filters: [], limit: NONE] - └── estimated rows: 0.00 + └── Sort + ├── output columns: [tbpush.b (#0)] + ├── sort keys: [b ASC NULLS LAST] + ├── estimated rows: 0.00 + └── TableScan + ├── table: default.test_explain_window.tbpush + ├── output columns: [b (#0)] + ├── read rows: 0 + ├── read size: 0 + ├── partitions total: 0 + ├── partitions scanned: 0 + ├── push downs: [filters: [], limit: NONE] + └── estimated rows: 0.00 # test push down limit to window function statement ok @@ -370,8 +389,8 @@ explain pipeline select a, dense_rank() over (partition by a order by a desc) fr CompoundBlockOperator(Project) × 1 processor LimitTransform × 1 processor Transform Window × 1 processor - Merge (TransformSortMergeLimit × 4 processors) to (Transform Window × 1) - TransformSortMergeLimit × 4 processors + Merge (TransformSortMerge × 4 processors) to (Transform Window × 1) + TransformSortMerge × 4 processors SortPartialTransform × 4 processors Merge (DeserializeDataTransform × 1 processor) to (SortPartialTransform × 4) DeserializeDataTransform × 1 processor @@ -384,8 +403,8 @@ explain pipeline select a, sum(a) over (partition by a order by a desc rows betw CompoundBlockOperator(Project) × 1 processor LimitTransform × 1 processor Transform Window × 1 processor - Merge (TransformSortMergeLimit × 4 processors) to (Transform Window × 1) - TransformSortMergeLimit × 4 processors + Merge (TransformSortMerge × 4 processors) to (Transform Window × 1) + TransformSortMerge × 4 processors SortPartialTransform × 4 processors Merge (DeserializeDataTransform × 1 processor) to (SortPartialTransform × 4) DeserializeDataTransform × 1 processor @@ -469,19 +488,23 @@ RowFetch ├── partition by: [a] ├── order by: [a] ├── frame: [Rows: Preceding(None) ~ CurrentRow] - └── Filter + └── Sort ├── output columns: [t.a (#0), t.b (#1), t._row_id (#7)] - ├── filters: [is_true(t.a (#0) > 1)] + ├── sort keys: [a ASC NULLS LAST, a DESC NULLS LAST] ├── estimated rows: 0.00 - └── TableScan - ├── table: default.test_explain_window.t - ├── output columns: [a (#0), b (#1), _row_id (#7)] - ├── read rows: 0 - ├── read size: 0 - ├── partitions total: 0 - ├── partitions scanned: 0 - ├── push downs: [filters: [is_true(t.a (#0) > 1)], limit: NONE] - └── estimated rows: 0.00 + └── Filter + ├── output columns: [t.a (#0), t.b (#1), t._row_id (#7)] + ├── filters: [is_true(t.a (#0) > 1)] + ├── estimated rows: 0.00 + └── TableScan + ├── table: default.test_explain_window.t + ├── output columns: [a (#0), b (#1), _row_id (#7)] + ├── read rows: 0 + ├── read size: 0 + ├── partitions total: 0 + ├── partitions scanned: 0 + ├── push downs: [filters: [is_true(t.a (#0) > 1)], limit: NONE] + └── estimated rows: 0.00 statement ok drop table if exists table43764_orc @@ -504,5 +527,22 @@ CompoundBlockOperator(Project) × 1 processor DeserializeDataTransform × 1 processor SyncReadParquetDataSource × 1 processor +# same order multi window +query T +explain pipeline select *,lead(number,1, 42) over (order by number), lead(number,2,44) over (order by number), lead(number,3,44) over (order by number) from numbers(5); +---- +CompoundBlockOperator(Project) × 1 processor + Transform Window × 1 processor + CompoundBlockOperator(Map) × 1 processor + Transform Window × 1 processor + CompoundBlockOperator(Map) × 1 processor + Transform Window × 1 processor + Merge (TransformSortMerge × 4 processors) to (Transform Window × 1) + TransformSortMerge × 4 processors + SortPartialTransform × 4 processors + Merge (CompoundBlockOperator(Map) × 1 processor) to (SortPartialTransform × 4) + CompoundBlockOperator(Map) × 1 processor + NumbersSourceTransform × 1 processor + statement ok DROP DATABASE test_explain_window; diff --git a/tests/sqllogictests/suites/tpcds/tpcds_join_order.test b/tests/sqllogictests/suites/tpcds/tpcds_join_order.test index da4f3eca1bfd..f780c3968446 100644 --- a/tests/sqllogictests/suites/tpcds/tpcds_join_order.test +++ b/tests/sqllogictests/suites/tpcds/tpcds_join_order.test @@ -6841,27 +6841,27 @@ ORDER BY lochierarchy DESC , rank_within_parent LIMIT 100; ---- -HashJoin: INNER +HashJoin: LEFT SEMI ├── Build -│ └── Scan: default.tpcds.date_dim (#1) (read rows: 73049) +│ └── HashJoin: INNER +│ ├── Build +│ │ └── Scan: default.tpcds.store (#4) (read rows: 1) +│ └── Probe +│ └── HashJoin: INNER +│ ├── Build +│ │ └── Scan: default.tpcds.date_dim (#5) (read rows: 73049) +│ └── Probe +│ └── Scan: default.tpcds.store_sales (#3) (read rows: 28810) └── Probe └── HashJoin: INNER ├── Build - │ └── HashJoin: INNER - │ ├── Build - │ │ └── HashJoin: INNER - │ │ ├── Build - │ │ │ └── Scan: default.tpcds.store (#4) (read rows: 1) - │ │ └── Probe - │ │ └── HashJoin: INNER - │ │ ├── Build - │ │ │ └── Scan: default.tpcds.date_dim (#5) (read rows: 73049) - │ │ └── Probe - │ │ └── Scan: default.tpcds.store_sales (#3) (read rows: 28810) - │ └── Probe - │ └── Scan: default.tpcds.store (#2) (read rows: 1) + │ └── Scan: default.tpcds.store (#2) (read rows: 1) └── Probe - └── Scan: default.tpcds.store_sales (#0) (read rows: 28810) + └── HashJoin: INNER + ├── Build + │ └── Scan: default.tpcds.date_dim (#1) (read rows: 73049) + └── Probe + └── Scan: default.tpcds.store_sales (#0) (read rows: 28810) # Q71 query I