Skip to content

Commit

Permalink
support window functions (#1112)
Browse files Browse the repository at this point in the history
* support window functions

* clippy

* new test for distributed planner for a window expression query, enable previously disabled test
  • Loading branch information
onursatici authored Nov 19, 2024
1 parent 68b2277 commit 5b6b50b
Showing 1 changed file with 121 additions and 10 deletions.
131 changes: 121 additions & 10 deletions ballista/scheduler/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use ballista_core::{
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion::physical_plan::windows::WindowAggExec;
use datafusion::physical_plan::{
with_new_children_if_necessary, ExecutionPlan, Partitioning,
};
Expand Down Expand Up @@ -148,12 +147,6 @@ impl DistributedPlanner {
Ok((children[0].clone(), stages))
}
}
} else if let Some(window) =
execution_plan.as_any().downcast_ref::<WindowAggExec>()
{
Err(BallistaError::NotImplemented(format!(
"WindowAggExec with window {window:?}"
)))
} else {
Ok((
with_new_children_if_necessary(execution_plan, children)?,
Expand Down Expand Up @@ -305,15 +298,20 @@ mod test {
use crate::planner::DistributedPlanner;
use crate::test_utils::datafusion_test_context;
use ballista_core::error::BallistaError;
use ballista_core::execution_plans::UnresolvedShuffleExec;
use ballista_core::execution_plans::{ShuffleWriterExec, UnresolvedShuffleExec};
use ballista_core::serde::BallistaCodec;
use datafusion::arrow::compute::SortOptions;
use datafusion::physical_expr::expressions::Column;
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode};
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::joins::HashJoinExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion::physical_plan::windows::BoundedWindowAggExec;
use datafusion::physical_plan::{displayable, ExecutionPlan};
use datafusion::physical_plan::{InputOrderMode, Partitioning};
use datafusion::prelude::SessionContext;
use datafusion_proto::physical_plan::AsExecutionPlan;
use datafusion_proto::protobuf::LogicalPlanNode;
Expand Down Expand Up @@ -592,8 +590,121 @@ order by
Ok(())
}

#[ignore]
// enable when upgrading Datafusion, a bug is fixed with https://github.com/apache/datafusion/pull/11926/
#[tokio::test]
async fn distributed_window_plan() -> Result<(), BallistaError> {
let ctx = datafusion_test_context("testdata").await?;
let session_state = ctx.state();

// simplified form of TPC-DS query 67
let df = ctx
.sql(
"
select * from (
select
l_shipmode,
l_shipdate,
rank() over (partition by l_shipmode order by l_shipdate desc) rk
from lineitem
) alias1
where rk <= 100 order by l_shipdate, rk;
",
)
.await?;

let plan = df.into_optimized_plan()?;
let plan = session_state.optimize(&plan)?;
let plan = session_state.create_physical_plan(&plan).await?;

let mut planner = DistributedPlanner::new();
let job_uuid = Uuid::new_v4();
let stages = planner.plan_query_stages(&job_uuid.to_string(), plan)?;
for (i, stage) in stages.iter().enumerate() {
println!("Stage {i}:\n{}", displayable(stage.as_ref()).indent(false));
}
/*
expected result:
Stage 0:
ShuffleWriterExec: Some(Hash([Column { name: "l_shipmode", index: 1 }], 2))
CsvExec: file_groups={2 groups: [[testdata/lineitem/partition0.tbl], [testdata/lineitem/partition1.tbl]]}, projection=[l_shipdate, l_shipmode], has_header=false
Stage 1:
ShuffleWriterExec: None
SortExec: expr=[l_shipdate@1 ASC NULLS LAST,rk@2 ASC NULLS LAST], preserve_partitioning=[true]
ProjectionExec: expr=[l_shipmode@1 as l_shipmode, l_shipdate@0 as l_shipdate, RANK() PARTITION BY [lineitem.l_shipmode] ORDER BY [lineitem.l_shipdate DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as rk]
CoalesceBatchesExec: target_batch_size=8192
FilterExec: RANK() PARTITION BY [lineitem.l_shipmode] ORDER BY [lineitem.l_shipdate DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 <= 100
BoundedWindowAggExec: wdw=[RANK() PARTITION BY [lineitem.l_shipmode] ORDER BY [lineitem.l_shipdate DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "RANK() PARTITION BY [lineitem.l_shipmode] ORDER BY [lineitem.l_shipdate DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(IntervalMonthDayNano("NULL")), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]
SortExec: expr=[l_shipmode@1 ASC NULLS LAST,l_shipdate@0 DESC], preserve_partitioning=[true]
CoalesceBatchesExec: target_batch_size=8192
UnresolvedShuffleExec
Stage 2:
ShuffleWriterExec: None
SortPreservingMergeExec: [l_shipdate@1 ASC NULLS LAST,rk@2 ASC NULLS LAST]
UnresolvedShuffleExec
*/

assert_eq!(3, stages.len());

// stage0
let stage0 = stages[0].clone();
let shuffle_write = downcast_exec!(stage0, ShuffleWriterExec);
let partitioning = shuffle_write.shuffle_output_partitioning().expect("stage0");
assert_eq!(2, partitioning.partition_count());
let partition_col = match partitioning {
Partitioning::Hash(exprs, 2) => match exprs.as_slice() {
[ref col] => col.as_any().downcast_ref::<Column>(),
_ => None,
},
_ => None,
};
assert_eq!(Some(&Column::new("l_shipmode", 1)), partition_col);

// stage1
let sort = downcast_exec!(stages[1].children()[0], SortExec);
let projection = downcast_exec!(sort.children()[0], ProjectionExec);
let coalesce = downcast_exec!(projection.children()[0], CoalesceBatchesExec);
let filter = downcast_exec!(coalesce.children()[0], FilterExec);
let window = downcast_exec!(filter.children()[0], BoundedWindowAggExec);
let partition_by = match window.partition_keys.as_slice() {
[ref col] => col.as_any().downcast_ref::<Column>(),
_ => None,
};
assert_eq!(Some(&Column::new("l_shipmode", 1)), partition_by);
assert_eq!(InputOrderMode::Sorted, window.input_order_mode);
let sort = downcast_exec!(window.children()[0], SortExec);
match sort.expr() {
[expr1, expr2] => {
assert_eq!(
SortOptions {
descending: false,
nulls_first: false
},
expr1.options
);
assert_eq!(
Some(&Column::new("l_shipmode", 1)),
expr1.expr.as_any().downcast_ref()
);
assert_eq!(
SortOptions {
descending: true,
nulls_first: true
},
expr2.options
);
assert_eq!(
Some(&Column::new("l_shipdate", 0)),
expr2.expr.as_any().downcast_ref()
);
}
_ => panic!("invalid sort {:?}", sort),
};

Ok(())
}

#[tokio::test]
async fn roundtrip_serde_aggregate() -> Result<(), BallistaError> {
let ctx = datafusion_test_context("testdata").await?;
Expand Down

0 comments on commit 5b6b50b

Please sign in to comment.