File tree 1 file changed +9
-3
lines changed
datafusion/physical-plan/src/repartition
1 file changed +9
-3
lines changed Original file line number Diff line number Diff line change @@ -261,6 +261,7 @@ impl BatchPartitioner {
261
261
num_partitions : partitions,
262
262
hash_buffer,
263
263
} => {
264
+ // Tracking time required for distributing indexes across output partitions
264
265
let timer = self . timer . timer ( ) ;
265
266
266
267
let arrays = exprs
@@ -282,6 +283,11 @@ impl BatchPartitioner {
282
283
. append_value ( index as u64 ) ;
283
284
}
284
285
286
+ // Finished building index-arrays for output partitions
287
+ timer. done ( ) ;
288
+
289
+ // Borrowing partitioner timer to prevent moving `self` to closure
290
+ let partitioner_timer = & self . timer ;
285
291
let it = indices
286
292
. into_iter ( )
287
293
. enumerate ( )
@@ -290,6 +296,9 @@ impl BatchPartitioner {
290
296
( !indices. is_empty ( ) ) . then_some ( ( partition, indices) )
291
297
} )
292
298
. map ( move |( partition, indices) | {
299
+ // Tracking time required for repartitioned batches construction
300
+ let _timer = partitioner_timer. timer ( ) ;
301
+
293
302
// Produce batches based on indices
294
303
let columns = batch
295
304
. columns ( )
@@ -303,9 +312,6 @@ impl BatchPartitioner {
303
312
let batch =
304
313
RecordBatch :: try_new ( batch. schema ( ) , columns) . unwrap ( ) ;
305
314
306
- // bind timer so it drops w/ this iterator
307
- let _ = & timer;
308
-
309
315
Ok ( ( partition, batch) )
310
316
} ) ;
311
317
You can’t perform that action at this time.
0 commit comments