@@ -24,11 +24,14 @@ use arrow::{
24
24
} ;
25
25
use arrow_schema:: Field ;
26
26
use datafusion:: error:: Result ;
27
+ use datafusion:: functions_aggregate:: average:: avg_udaf;
27
28
use datafusion:: prelude:: * ;
28
29
use datafusion_common:: ScalarValue ;
29
- use datafusion_expr:: function:: WindowUDFFieldArgs ;
30
+ use datafusion_expr:: expr:: WindowFunction ;
31
+ use datafusion_expr:: function:: { WindowFunctionSimplification , WindowUDFFieldArgs } ;
32
+ use datafusion_expr:: simplify:: SimplifyInfo ;
30
33
use datafusion_expr:: {
31
- PartitionEvaluator , Signature , WindowFrame , WindowUDF , WindowUDFImpl ,
34
+ Expr , PartitionEvaluator , Signature , WindowFrame , WindowUDF , WindowUDFImpl ,
32
35
} ;
33
36
use datafusion_functions_window_common:: partition:: PartitionEvaluatorArgs ;
34
37
@@ -142,6 +145,67 @@ impl PartitionEvaluator for MyPartitionEvaluator {
142
145
}
143
146
}
144
147
148
+ /// This UDWF will show how to use the WindowUDFImpl::simplify() API
149
+ #[ derive( Debug , Clone ) ]
150
+ struct SimplifySmoothItUdf {
151
+ signature : Signature ,
152
+ }
153
+
154
+ impl SimplifySmoothItUdf {
155
+ fn new ( ) -> Self {
156
+ Self {
157
+ signature : Signature :: exact (
158
+ // this function will always take one arguments of type f64
159
+ vec ! [ DataType :: Float64 ] ,
160
+ // this function is deterministic and will always return the same
161
+ // result for the same input
162
+ Volatility :: Immutable ,
163
+ ) ,
164
+ }
165
+ }
166
+ }
167
+ impl WindowUDFImpl for SimplifySmoothItUdf {
168
+ fn as_any ( & self ) -> & dyn Any {
169
+ self
170
+ }
171
+
172
+ fn name ( & self ) -> & str {
173
+ "simplify_smooth_it"
174
+ }
175
+
176
+ fn signature ( & self ) -> & Signature {
177
+ & self . signature
178
+ }
179
+
180
+ fn partition_evaluator (
181
+ & self ,
182
+ _partition_evaluator_args : PartitionEvaluatorArgs ,
183
+ ) -> Result < Box < dyn PartitionEvaluator > > {
184
+ todo ! ( )
185
+ }
186
+
187
+ /// this function will simplify `SimplifySmoothItUdf` to `AggregateUDF` for `Avg`
188
+ /// default implementation will not be called (left as `todo!()`)
189
+ fn simplify ( & self ) -> Option < WindowFunctionSimplification > {
190
+ let simplify = |window_function : WindowFunction , _: & dyn SimplifyInfo | {
191
+ Ok ( Expr :: WindowFunction ( WindowFunction {
192
+ fun : datafusion_expr:: WindowFunctionDefinition :: AggregateUDF ( avg_udaf ( ) ) ,
193
+ args : window_function. args ,
194
+ partition_by : window_function. partition_by ,
195
+ order_by : window_function. order_by ,
196
+ window_frame : window_function. window_frame ,
197
+ null_treatment : window_function. null_treatment ,
198
+ } ) )
199
+ } ;
200
+
201
+ Some ( Box :: new ( simplify) )
202
+ }
203
+
204
+ fn field ( & self , field_args : WindowUDFFieldArgs ) -> Result < Field > {
205
+ Ok ( Field :: new ( field_args. name ( ) , DataType :: Float64 , true ) )
206
+ }
207
+ }
208
+
145
209
// create local execution context with `cars.csv` registered as a table named `cars`
146
210
async fn create_context ( ) -> Result < SessionContext > {
147
211
// declare a new context. In spark API, this corresponds to a new spark SQL session
@@ -162,12 +226,15 @@ async fn main() -> Result<()> {
162
226
let smooth_it = WindowUDF :: from ( SmoothItUdf :: new ( ) ) ;
163
227
ctx. register_udwf ( smooth_it. clone ( ) ) ;
164
228
165
- // Use SQL to run the new window function
229
+ let simplify_smooth_it = WindowUDF :: from ( SimplifySmoothItUdf :: new ( ) ) ;
230
+ ctx. register_udwf ( simplify_smooth_it. clone ( ) ) ;
231
+
232
+ // Use SQL to retrieve entire table
166
233
let df = ctx. sql ( "SELECT * from cars" ) . await ?;
167
234
// print the results
168
235
df. show ( ) . await ?;
169
236
170
- // Use SQL to run the new window function :
237
+ // Use SQL to run smooth_it :
171
238
//
172
239
// `PARTITION BY car`:each distinct value of car (red, and green)
173
240
// should be treated as a separate partition (and will result in
@@ -201,7 +268,7 @@ async fn main() -> Result<()> {
201
268
// print the results
202
269
df. show ( ) . await ?;
203
270
204
- // this time, call the new widow function with an explicit
271
+ // this time, call the function with an explicit
205
272
// window so evaluate will be invoked with each window.
206
273
//
207
274
// `ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING`: each invocation
@@ -232,5 +299,22 @@ async fn main() -> Result<()> {
232
299
// print the results
233
300
df. show ( ) . await ?;
234
301
302
+ // Use SQL to run simplify_smooth_it
303
+ let df = ctx
304
+ . sql (
305
+ "SELECT \
306
+ car, \
307
+ speed, \
308
+ simplify_smooth_it(speed) OVER (PARTITION BY car ORDER BY time) AS smooth_speed,\
309
+ time \
310
+ from cars \
311
+ ORDER BY \
312
+ car",
313
+ )
314
+ . await ?;
315
+
316
+ // print the results
317
+ df. show ( ) . await ?;
318
+
235
319
Ok ( ( ) )
236
320
}
0 commit comments