Skip to content

Commit d3cfc45

Browse files
authored
Minor: Use div_ceil
1 parent 47569b2 commit d3cfc45

File tree

2 files changed

+26
-25
lines changed

2 files changed

+26
-25
lines changed

datafusion/physical-plan/src/joins/cross_join.rs

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -190,18 +190,21 @@ async fn load_left_input(
190190

191191
// Load all batches and count the rows
192192
let (batches, _metrics, reservation) = stream
193-
.try_fold((Vec::new(), metrics, reservation), |mut acc, batch| async {
194-
let batch_size = batch.get_array_memory_size();
195-
// Reserve memory for incoming batch
196-
acc.2.try_grow(batch_size)?;
197-
// Update metrics
198-
acc.1.build_mem_used.add(batch_size);
199-
acc.1.build_input_batches.add(1);
200-
acc.1.build_input_rows.add(batch.num_rows());
201-
// Push batch to output
202-
acc.0.push(batch);
203-
Ok(acc)
204-
})
193+
.try_fold(
194+
(Vec::new(), metrics, reservation),
195+
|(mut batches, metrics, mut reservation), batch| async {
196+
let batch_size = batch.get_array_memory_size();
197+
// Reserve memory for incoming batch
198+
reservation.try_grow(batch_size)?;
199+
// Update metrics
200+
metrics.build_mem_used.add(batch_size);
201+
metrics.build_input_batches.add(1);
202+
metrics.build_input_rows.add(batch.num_rows());
203+
// Push batch to output
204+
batches.push(batch);
205+
Ok((batches, metrics, reservation))
206+
},
207+
)
205208
.await?;
206209

207210
let merged_batch = concat_batches(&left_schema, &batches)?;

datafusion/physical-plan/src/joins/nested_loop_join.rs

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ use arrow::array::{BooleanBufferBuilder, UInt32Array, UInt64Array};
4545
use arrow::compute::concat_batches;
4646
use arrow::datatypes::{Schema, SchemaRef};
4747
use arrow::record_batch::RecordBatch;
48-
use arrow::util::bit_util;
4948
use datafusion_common::{
5049
exec_datafusion_err, internal_err, JoinSide, Result, Statistics,
5150
};
@@ -440,17 +439,17 @@ async fn collect_left_input(
440439
let (batches, metrics, mut reservation) = stream
441440
.try_fold(
442441
(Vec::new(), join_metrics, reservation),
443-
|mut acc, batch| async {
442+
|(mut batches, metrics, mut reservation), batch| async {
444443
let batch_size = batch.get_array_memory_size();
445444
// Reserve memory for incoming batch
446-
acc.2.try_grow(batch_size)?;
445+
reservation.try_grow(batch_size)?;
447446
// Update metrics
448-
acc.1.build_mem_used.add(batch_size);
449-
acc.1.build_input_batches.add(1);
450-
acc.1.build_input_rows.add(batch.num_rows());
447+
metrics.build_mem_used.add(batch_size);
448+
metrics.build_input_batches.add(1);
449+
metrics.build_input_rows.add(batch.num_rows());
451450
// Push batch to output
452-
acc.0.push(batch);
453-
Ok(acc)
451+
batches.push(batch);
452+
Ok((batches, metrics, reservation))
454453
},
455454
)
456455
.await?;
@@ -459,14 +458,13 @@ async fn collect_left_input(
459458

460459
// Reserve memory for visited_left_side bitmap if required by join type
461460
let visited_left_side = if with_visited_left_side {
462-
// TODO: Replace `ceil` wrapper with stable `div_cell` after
463-
// https://github.com/rust-lang/rust/issues/88581
464-
let buffer_size = bit_util::ceil(merged_batch.num_rows(), 8);
461+
let n_rows = merged_batch.num_rows();
462+
let buffer_size = n_rows.div_ceil(8);
465463
reservation.try_grow(buffer_size)?;
466464
metrics.build_mem_used.add(buffer_size);
467465

468-
let mut buffer = BooleanBufferBuilder::new(merged_batch.num_rows());
469-
buffer.append_n(merged_batch.num_rows(), false);
466+
let mut buffer = BooleanBufferBuilder::new(n_rows);
467+
buffer.append_n(n_rows, false);
470468
buffer
471469
} else {
472470
BooleanBufferBuilder::new(0)

0 commit comments

Comments
 (0)