Skip to content

Commit e90b3ac

Browse files
authored
Minor: unecessary row_count calculation in CrossJoinExec and NestedLoopsJoinExec (#11632)
* Minor: remove row_count calculation * Minor: remove row_count calculation
1 parent 6efdbe6 commit e90b3ac

File tree

2 files changed

+19
-26
lines changed

2 files changed

+19
-26
lines changed

datafusion/physical-plan/src/joins/cross_join.rs

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -154,24 +154,19 @@ async fn load_left_input(
154154
let stream = merge.execute(0, context)?;
155155

156156
// Load all batches and count the rows
157-
let (batches, _num_rows, _, reservation) = stream
158-
.try_fold(
159-
(Vec::new(), 0usize, metrics, reservation),
160-
|mut acc, batch| async {
161-
let batch_size = batch.get_array_memory_size();
162-
// Reserve memory for incoming batch
163-
acc.3.try_grow(batch_size)?;
164-
// Update metrics
165-
acc.2.build_mem_used.add(batch_size);
166-
acc.2.build_input_batches.add(1);
167-
acc.2.build_input_rows.add(batch.num_rows());
168-
// Update rowcount
169-
acc.1 += batch.num_rows();
170-
// Push batch to output
171-
acc.0.push(batch);
172-
Ok(acc)
173-
},
174-
)
157+
let (batches, _metrics, reservation) = stream
158+
.try_fold((Vec::new(), metrics, reservation), |mut acc, batch| async {
159+
let batch_size = batch.get_array_memory_size();
160+
// Reserve memory for incoming batch
161+
acc.2.try_grow(batch_size)?;
162+
// Update metrics
163+
acc.1.build_mem_used.add(batch_size);
164+
acc.1.build_input_batches.add(1);
165+
acc.1.build_input_rows.add(batch.num_rows());
166+
// Push batch to output
167+
acc.0.push(batch);
168+
Ok(acc)
169+
})
175170
.await?;
176171

177172
let merged_batch = concat_batches(&left_schema, &batches)?;

datafusion/physical-plan/src/joins/nested_loop_join.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -364,19 +364,17 @@ async fn collect_left_input(
364364
let stream = merge.execute(0, context)?;
365365

366366
// Load all batches and count the rows
367-
let (batches, _num_rows, metrics, mut reservation) = stream
367+
let (batches, metrics, mut reservation) = stream
368368
.try_fold(
369-
(Vec::new(), 0usize, join_metrics, reservation),
369+
(Vec::new(), join_metrics, reservation),
370370
|mut acc, batch| async {
371371
let batch_size = batch.get_array_memory_size();
372372
// Reserve memory for incoming batch
373-
acc.3.try_grow(batch_size)?;
373+
acc.2.try_grow(batch_size)?;
374374
// Update metrics
375-
acc.2.build_mem_used.add(batch_size);
376-
acc.2.build_input_batches.add(1);
377-
acc.2.build_input_rows.add(batch.num_rows());
378-
// Update rowcount
379-
acc.1 += batch.num_rows();
375+
acc.1.build_mem_used.add(batch_size);
376+
acc.1.build_input_batches.add(1);
377+
acc.1.build_input_rows.add(batch.num_rows());
380378
// Push batch to output
381379
acc.0.push(batch);
382380
Ok(acc)

0 commit comments

Comments
 (0)