File tree 1 file changed +21
-7
lines changed
datafusion/functions-aggregate/src
1 file changed +21
-7
lines changed Original file line number Diff line number Diff line change @@ -242,14 +242,28 @@ impl<T: ArrowNumericType> Debug for MedianAccumulator<T> {
242
242
243
243
impl < T : ArrowNumericType > Accumulator for MedianAccumulator < T > {
244
244
fn state ( & mut self ) -> Result < Vec < ScalarValue > > {
245
- let all_values = self
246
- . all_values
247
- . iter ( )
248
- . map ( |x| ScalarValue :: new_primitive :: < T > ( Some ( * x) , & self . data_type ) )
249
- . collect :: < Result < Vec < _ > > > ( ) ?;
245
+ // Convert `all_values` to `ListArray` and return a single List ScalarValue
250
246
251
- let arr = ScalarValue :: new_list_nullable ( & all_values, & self . data_type ) ;
252
- Ok ( vec ! [ ScalarValue :: List ( arr) ] )
247
+ // Build offsets
248
+ let offsets =
249
+ OffsetBuffer :: new ( ScalarBuffer :: from ( vec ! [ 0 , self . all_values. len( ) as i32 ] ) ) ;
250
+
251
+ // Build inner array
252
+ let values_array = PrimitiveArray :: < T > :: new (
253
+ ScalarBuffer :: from ( std:: mem:: take ( & mut self . all_values ) ) ,
254
+ None ,
255
+ )
256
+ . with_data_type ( self . data_type . clone ( ) ) ;
257
+
258
+ // Build the result list array
259
+ let list_array = ListArray :: new (
260
+ Arc :: new ( Field :: new_list_field ( self . data_type . clone ( ) , true ) ) ,
261
+ offsets,
262
+ Arc :: new ( values_array) ,
263
+ None ,
264
+ ) ;
265
+
266
+ Ok ( vec ! [ ScalarValue :: List ( Arc :: new( list_array) ) ] )
253
267
}
254
268
255
269
fn update_batch ( & mut self , values : & [ ArrayRef ] ) -> Result < ( ) > {
You can’t perform that action at this time.
0 commit comments