Skip to content

Commit 00a36df

Browse files
lewiszlwfindepi
authored andcommitted
Fix window expr deserialization (apache#10506)
* Fix window expr deserialization * Improve naming and doc * Update window test
1 parent 35cb248 commit 00a36df

File tree

4 files changed

+38
-37
lines changed

4 files changed

+38
-37
lines changed

datafusion/core/tests/fuzz_cases/window_fuzz.rs

Lines changed: 3 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,10 @@ use arrow::compute::{concat_batches, SortOptions};
2222
use arrow::datatypes::SchemaRef;
2323
use arrow::record_batch::RecordBatch;
2424
use arrow::util::pretty::pretty_format_batches;
25-
use arrow_schema::{Field, Schema};
2625
use datafusion::physical_plan::memory::MemoryExec;
2726
use datafusion::physical_plan::sorts::sort::SortExec;
2827
use datafusion::physical_plan::windows::{
29-
create_window_expr, BoundedWindowAggExec, WindowAggExec,
28+
create_window_expr, schema_add_window_field, BoundedWindowAggExec, WindowAggExec,
3029
};
3130
use datafusion::physical_plan::InputOrderMode::{Linear, PartiallySorted, Sorted};
3231
use datafusion::physical_plan::{collect, InputOrderMode};
@@ -40,7 +39,6 @@ use datafusion_expr::{
4039
};
4140
use datafusion_physical_expr::expressions::{cast, col, lit};
4241
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
43-
use itertools::Itertools;
4442
use test_utils::add_empty_batches;
4543

4644
use hashbrown::HashMap;
@@ -276,7 +274,7 @@ async fn bounded_window_causal_non_causal() -> Result<()> {
276274
};
277275

278276
let extended_schema =
279-
schema_add_window_fields(&args, &schema, &window_fn, fn_name)?;
277+
schema_add_window_field(&args, &schema, &window_fn, fn_name)?;
280278

281279
let window_expr = create_window_expr(
282280
&window_fn,
@@ -683,7 +681,7 @@ async fn run_window_test(
683681
exec1 = Arc::new(SortExec::new(sort_keys, exec1)) as _;
684682
}
685683

686-
let extended_schema = schema_add_window_fields(&args, &schema, &window_fn, &fn_name)?;
684+
let extended_schema = schema_add_window_field(&args, &schema, &window_fn, &fn_name)?;
687685

688686
let usual_window_exec = Arc::new(WindowAggExec::try_new(
689687
vec![create_window_expr(
@@ -754,32 +752,6 @@ async fn run_window_test(
754752
Ok(())
755753
}
756754

757-
// The planner has fully updated schema before calling the `create_window_expr`
758-
// Replicate the same for this test
759-
fn schema_add_window_fields(
760-
args: &[Arc<dyn PhysicalExpr>],
761-
schema: &Arc<Schema>,
762-
window_fn: &WindowFunctionDefinition,
763-
fn_name: &str,
764-
) -> Result<Arc<Schema>> {
765-
let data_types = args
766-
.iter()
767-
.map(|e| e.clone().as_ref().data_type(schema))
768-
.collect::<Result<Vec<_>>>()?;
769-
let window_expr_return_type = window_fn.return_type(&data_types)?;
770-
let mut window_fields = schema
771-
.fields()
772-
.iter()
773-
.map(|f| f.as_ref().clone())
774-
.collect_vec();
775-
window_fields.extend_from_slice(&[Field::new(
776-
fn_name,
777-
window_expr_return_type,
778-
true,
779-
)]);
780-
Ok(Arc::new(Schema::new(window_fields)))
781-
}
782-
783755
/// Return randomly sized record batches with:
784756
/// three sorted int32 columns 'a', 'b', 'c' ranged from 0..DISTINCT as columns
785757
/// one random int32 column x

datafusion/physical-plan/src/windows/mod.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use datafusion_physical_expr::{
4242
window::{BuiltInWindowFunctionExpr, SlidingAggregateWindowExpr},
4343
AggregateExpr, EquivalenceProperties, LexOrdering, PhysicalSortRequirement,
4444
};
45+
use itertools::Itertools;
4546

4647
mod bounded_window_agg_exec;
4748
mod window_agg_exec;
@@ -52,6 +53,31 @@ pub use datafusion_physical_expr::window::{
5253
};
5354
pub use window_agg_exec::WindowAggExec;
5455

56+
/// Build field from window function and add it into schema
57+
pub fn schema_add_window_field(
58+
args: &[Arc<dyn PhysicalExpr>],
59+
schema: &Schema,
60+
window_fn: &WindowFunctionDefinition,
61+
fn_name: &str,
62+
) -> Result<Arc<Schema>> {
63+
let data_types = args
64+
.iter()
65+
.map(|e| e.clone().as_ref().data_type(schema))
66+
.collect::<Result<Vec<_>>>()?;
67+
let window_expr_return_type = window_fn.return_type(&data_types)?;
68+
let mut window_fields = schema
69+
.fields()
70+
.iter()
71+
.map(|f| f.as_ref().clone())
72+
.collect_vec();
73+
window_fields.extend_from_slice(&[Field::new(
74+
fn_name,
75+
window_expr_return_type,
76+
false,
77+
)]);
78+
Ok(Arc::new(Schema::new(window_fields)))
79+
}
80+
5581
/// Create a physical expression for window function
5682
#[allow(clippy::too_many_arguments)]
5783
pub fn create_window_expr(

datafusion/proto/src/physical_plan/from_proto.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use datafusion::physical_plan::expressions::{
4040
in_list, BinaryExpr, CaseExpr, CastExpr, Column, IsNotNullExpr, IsNullExpr, LikeExpr,
4141
Literal, NegativeExpr, NotExpr, TryCastExpr,
4242
};
43-
use datafusion::physical_plan::windows::create_window_expr;
43+
use datafusion::physical_plan::windows::{create_window_expr, schema_add_window_field};
4444
use datafusion::physical_plan::{
4545
ColumnStatistics, Partitioning, PhysicalExpr, Statistics, WindowExpr,
4646
};
@@ -155,14 +155,18 @@ pub fn parse_physical_window_expr(
155155
)
156156
})?;
157157

158+
let fun: WindowFunctionDefinition = convert_required!(proto.window_function)?;
159+
let name = proto.name.clone();
160+
let extended_schema =
161+
schema_add_window_field(&window_node_expr, input_schema, &fun, &name)?;
158162
create_window_expr(
159-
&convert_required!(proto.window_function)?,
160-
proto.name.clone(),
163+
&fun,
164+
name,
161165
&window_node_expr,
162166
&partition_by,
163167
&order_by,
164168
Arc::new(window_frame),
165-
input_schema,
169+
&extended_schema,
166170
false,
167171
)
168172
}

datafusion/proto/tests/cases/roundtrip_physical_plan.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -253,8 +253,7 @@ fn roundtrip_nested_loop_join() -> Result<()> {
253253
fn roundtrip_window() -> Result<()> {
254254
let field_a = Field::new("a", DataType::Int64, false);
255255
let field_b = Field::new("b", DataType::Int64, false);
256-
let field_c = Field::new("FIRST_VALUE(a) PARTITION BY [b] ORDER BY [a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", DataType::Int64, false);
257-
let schema = Arc::new(Schema::new(vec![field_a, field_b, field_c]));
256+
let schema = Arc::new(Schema::new(vec![field_a, field_b]));
258257

259258
let window_frame = WindowFrame::new_bounds(
260259
datafusion_expr::WindowFrameUnits::Range,

0 commit comments

Comments
 (0)