File tree 1 file changed +4
-4
lines changed
datafusion/physical-plan/src/repartition
1 file changed +4
-4
lines changed Original file line number Diff line number Diff line change @@ -277,7 +277,7 @@ impl BatchPartitioner {
277
277
let arrays = exprs
278
278
. iter ( )
279
279
. map ( |expr| {
280
- expr. evaluate ( & batch) ?. into_array ( batch. num_rows ( ) )
280
+ expr. evaluate ( batch) ?. into_array ( batch. num_rows ( ) )
281
281
} )
282
282
. collect :: < Result < Vec < _ > > > ( ) ?;
283
283
hash_buffer. clear ( ) ;
@@ -861,7 +861,7 @@ impl RepartitionExec {
861
861
timer. done ( ) ;
862
862
863
863
// Input is done
864
- let _ = match result {
864
+ match result {
865
865
Some ( result) => {
866
866
batches_buffer. push ( result?) ;
867
867
if is_hash_partitioning
@@ -874,7 +874,8 @@ impl RepartitionExec {
874
874
None if batches_buffer. is_empty ( ) => break ,
875
875
None => { }
876
876
} ;
877
- for res in partitioner. partition_iter ( batches_buffer. clone ( ) ) ? {
877
+ let batches_buffer = std:: mem:: take ( & mut batches_buffer) ;
878
+ for res in partitioner. partition_iter ( batches_buffer) ? {
878
879
let ( partition, batch) = res?;
879
880
let size = batch. get_array_memory_size ( ) ;
880
881
@@ -891,7 +892,6 @@ impl RepartitionExec {
891
892
}
892
893
timer. done ( ) ;
893
894
}
894
- batches_buffer. clear ( ) ;
895
895
896
896
// If the input stream is endless, we may spin forever and
897
897
// never yield back to tokio. See
You can’t perform that action at this time.
0 commit comments