Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(query): add sort plan before window plan #15616

Merged
merged 6 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 0 additions & 16 deletions src/query/service/src/pipelines/builders/builder_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,22 +56,6 @@ impl PipelineBuilder {
.collect::<Result<Vec<_>>>()?;

let old_output_len = self.main_pipeline.output_len();
if !partition_by.is_empty() || !order_by.is_empty() {
let mut sort_desc = Vec::with_capacity(partition_by.len() + order_by.len());

for offset in &partition_by {
sort_desc.push(SortColumnDescription {
offset: *offset,
asc: true,
nulls_first: true,
is_nullable: input_schema.field(*offset).is_nullable(), // This information is not needed here.
})
}

sort_desc.extend(order_by.clone());

self.build_sort_pipeline(input_schema.clone(), sort_desc, window.limit, None)?;
}
// `TransformWindow` is a pipeline breaker.
self.main_pipeline.try_resize(1)?;
let func = WindowFunctionInfo::try_create(&window.func, &input_schema)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ impl<Method: HashMethodBounds, V: Send + Sync + 'static> Debug for AggregateMeta
f.debug_struct("AggregateMeta::Serialized").finish()
}
AggregateMeta::Spilling(_) => f.debug_struct("Aggregate::Spilling").finish(),
AggregateMeta::Spilled(_) => f.debug_struct("Aggregate::Spilling").finish(),
AggregateMeta::Spilled(_) => f.debug_struct("Aggregate::Spilled").finish(),
AggregateMeta::BucketSpilled(_) => f.debug_struct("Aggregate::BucketSpilled").finish(),
AggregateMeta::AggregatePayload(_) => {
f.debug_struct("AggregateMeta:AggregatePayload").finish()
Expand Down
30 changes: 2 additions & 28 deletions src/query/sql/src/executor/physical_plans/physical_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use crate::executor::physical_plans::common::SortDesc;
use crate::executor::PhysicalPlan;
use crate::executor::PhysicalPlanBuilder;
use crate::optimizer::SExpr;
use crate::plans::ScalarItem;
use crate::plans::WindowFuncFrame;
use crate::plans::WindowFuncFrameBound;
use crate::plans::WindowFuncType;
Expand Down Expand Up @@ -152,7 +151,7 @@ impl PhysicalPlanBuilder {
s_expr: &SExpr,
window: &crate::plans::Window,
mut required: ColumnSet,
stat_info: PlanStatsInfo,
_stat_info: PlanStatsInfo,
) -> Result<PhysicalPlan> {
// 1. DO NOT Prune unused Columns cause window may not in required, eg:
// select s1.a from ( select t1.a as a, dense_rank() over(order by t1.a desc) as rk
Expand All @@ -175,37 +174,12 @@ impl PhysicalPlanBuilder {
required.insert(item.order_by_item.index);
});

let column_projections = required.clone().into_iter().collect::<Vec<_>>();

// 2. Build physical plan.
let input = self.build(s_expr.child(0)?, required).await?;
let mut w = window.clone();
// Generate a `EvalScalar` as the input of `Window`.
let mut scalar_items: Vec<ScalarItem> = Vec::new();
for arg in &w.arguments {
scalar_items.push(arg.clone());
}
for part in &w.partition_by {
scalar_items.push(part.clone());
}
for order in &w.order_by {
scalar_items.push(order.order_by_item.clone())
}
let input = if !scalar_items.is_empty() {
self.create_eval_scalar(
&crate::planner::plans::EvalScalar {
items: scalar_items,
},
column_projections,
input,
stat_info.clone(),
)?
} else {
input
};

let input_schema = input.output_schema()?;

// Unify the data type for range frame.
if w.frame.units.is_range() && w.order_by.len() == 1 {
let order_by = &mut w.order_by[0].order_by_item.scalar;

Expand Down
63 changes: 63 additions & 0 deletions src/query/sql/src/planner/binder/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@ use crate::optimizer::SExpr;
use crate::plans::walk_expr_mut;
use crate::plans::AggregateFunction;
use crate::plans::BoundColumnRef;
use crate::plans::EvalScalar;
use crate::plans::LagLeadFunction;
use crate::plans::NthValueFunction;
use crate::plans::ScalarExpr;
use crate::plans::ScalarItem;
use crate::plans::Sort;
use crate::plans::SortItem;
use crate::plans::SubqueryExpr;
use crate::plans::VisitorMut;
use crate::plans::Window;
Expand Down Expand Up @@ -63,6 +66,66 @@ impl Binder {
limit: None,
};

// eval scalars before sort
// Generate a `EvalScalar` as the input of `Window`.
let mut scalar_items: Vec<ScalarItem> = Vec::new();
for arg in &window_plan.arguments {
scalar_items.push(arg.clone());
}
for part in &window_plan.partition_by {
scalar_items.push(part.clone());
}
for order in &window_plan.order_by {
scalar_items.push(order.order_by_item.clone())
}

let child = if !scalar_items.is_empty() {
let eval_scalar_plan = EvalScalar {
items: scalar_items,
};
SExpr::create_unary(Arc::new(eval_scalar_plan.into()), Arc::new(child))
} else {
child
};

let default_nulls_first = !self
.ctx
.get_settings()
.get_sql_dialect()
.unwrap()
.is_null_biggest();

let mut sort_items: Vec<SortItem> = vec![];
if !window_plan.partition_by.is_empty() {
for part in window_plan.partition_by.iter() {
sort_items.push(SortItem {
index: part.index,
asc: true,
nulls_first: default_nulls_first,
});
}
}

for order in window_plan.order_by.iter() {
sort_items.push(SortItem {
index: order.order_by_item.index,
asc: order.asc.unwrap_or(true),
nulls_first: order.nulls_first.unwrap_or(default_nulls_first),
});
}

let child = if !sort_items.is_empty() {
let sort_plan = Sort {
items: sort_items,
limit: window_plan.limit,
after_exchange: None,
pre_projection: None,
};
SExpr::create_unary(Arc::new(sort_plan.into()), Arc::new(child))
} else {
child
};

Ok(SExpr::create_unary(
Arc::new(window_plan.into()),
Arc::new(child),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,9 @@ impl SortAndLimitPushDownOptimizer {
let exchange_sexpr = s_expr.child(0)?;
debug_assert!(matches!(
exchange_sexpr.plan.as_ref(),
RelOperator::Exchange(Exchange::Merge)
RelOperator::Exchange(Exchange::Merge) | RelOperator::Exchange(Exchange::MergeSort)
));

debug_assert!(exchange_sexpr.children.len() == 1);
let exchange_sexpr = exchange_sexpr.replace_plan(Arc::new(Exchange::MergeSort.into()));

Expand Down
72 changes: 40 additions & 32 deletions tests/sqllogictests/suites/mode/cluster/window.test
Original file line number Diff line number Diff line change
Expand Up @@ -81,39 +81,47 @@ Window
├── partition by: [department_id]
├── order by: [salary]
├── frame: [Range: Preceding(None) ~ CurrentRow]
└── Exchange
└── Sort
├── output columns: [e.name (#1), e.salary (#3), d.department_name (#5), d.department_id (#4)]
├── exchange type: Merge
└── HashJoin
├── output columns: [e.name (#1), e.salary (#3), d.department_name (#5), d.department_id (#4)]
├── join type: INNER
├── build keys: [d.department_id (#4)]
├── probe keys: [e.department_id (#2)]
├── filters: []
├── estimated rows: 8.00
├── Exchange(Build)
│ ├── output columns: [d.department_id (#4), d.department_name (#5)]
│ ├── exchange type: Broadcast
│ └── TableScan
│ ├── table: default.default.departments
│ ├── output columns: [department_id (#4), department_name (#5)]
│ ├── read rows: 4
│ ├── read size: < 1 KiB
│ ├── partitions total: 1
│ ├── partitions scanned: 1
│ ├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
│ ├── push downs: [filters: [], limit: NONE]
│ └── estimated rows: 4.00
└── TableScan(Probe)
├── table: default.default.employees
├── output columns: [name (#1), department_id (#2), salary (#3)]
├── read rows: 10
├── read size: < 1 KiB
├── partitions total: 1
├── partitions scanned: 1
├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
├── push downs: [filters: [], limit: NONE]
└── estimated rows: 10.00
├── sort keys: [department_id ASC NULLS LAST, salary DESC NULLS LAST]
├── estimated rows: 8.00
└── Exchange
├── output columns: [e.name (#1), e.salary (#3), d.department_name (#5), d.department_id (#4), #_order_col]
├── exchange type: Merge
└── Sort
├── output columns: [e.name (#1), e.salary (#3), d.department_name (#5), d.department_id (#4), #_order_col]
├── sort keys: [department_id ASC NULLS LAST, salary DESC NULLS LAST]
├── estimated rows: 8.00
└── HashJoin
├── output columns: [e.name (#1), e.salary (#3), d.department_name (#5), d.department_id (#4)]
├── join type: INNER
├── build keys: [d.department_id (#4)]
├── probe keys: [e.department_id (#2)]
├── filters: []
├── estimated rows: 8.00
├── Exchange(Build)
│ ├── output columns: [d.department_id (#4), d.department_name (#5)]
│ ├── exchange type: Broadcast
│ └── TableScan
│ ├── table: default.default.departments
│ ├── output columns: [department_id (#4), department_name (#5)]
│ ├── read rows: 4
│ ├── read size: < 1 KiB
│ ├── partitions total: 1
│ ├── partitions scanned: 1
│ ├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
│ ├── push downs: [filters: [], limit: NONE]
│ └── estimated rows: 4.00
└── TableScan(Probe)
├── table: default.default.employees
├── output columns: [name (#1), department_id (#2), salary (#3)]
├── read rows: 10
├── read size: < 1 KiB
├── partitions total: 1
├── partitions scanned: 1
├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
├── push downs: [filters: [], limit: NONE]
└── estimated rows: 10.00

statement ok
DROP TABLE employees;
Expand Down
Loading
Loading