Skip to content

Commit f10db59

Browse files
committed
Add coalescing into FilterExec
1 parent 3f9bf97 commit f10db59

File tree

1 file changed

+43
-13
lines changed

1 file changed

+43
-13
lines changed

datafusion/physical-plan/src/filter.rs

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ use datafusion_physical_expr::{
4444
analyze, split_conjunction, AnalysisContext, ConstExpr, ExprBoundaries, PhysicalExpr,
4545
};
4646

47+
use crate::coalesce::{BatchCoalescer, CoalescerState};
4748
use futures::stream::{Stream, StreamExt};
4849
use log::trace;
4950

@@ -279,10 +280,12 @@ impl ExecutionPlan for FilterExec {
279280
trace!("Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
280281
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
281282
Ok(Box::pin(FilterExecStream {
282-
schema: self.input.schema(),
283+
done: false,
283284
predicate: Arc::clone(&self.predicate),
284285
input: self.input.execute(partition, context)?,
285286
baseline_metrics,
287+
// TODO use actual target batch size, for now hardcode the default size
288+
coalescer: BatchCoalescer::new(self.input.schema(), 8192, self.fetch()),
286289
}))
287290
}
288291

@@ -337,14 +340,16 @@ fn collect_new_statistics(
337340
/// The FilterExec streams wraps the input iterator and applies the predicate expression to
338341
/// determine which rows to include in its output batches
339342
struct FilterExecStream {
340-
/// Output schema, which is the same as the input schema for this operator
341-
schema: SchemaRef,
343+
/// Is the sstream done?
344+
done: bool,
342345
/// The expression to filter on. This expression must evaluate to a boolean value.
343346
predicate: Arc<dyn PhysicalExpr>,
344347
/// The input partition to filter.
345348
input: SendableRecordBatchStream,
346349
/// runtime metrics recording
347350
baseline_metrics: BaselineMetrics,
351+
/// Build up output batches incrementally
352+
coalescer: BatchCoalescer,
348353
}
349354

350355
pub fn batch_filter(
@@ -374,22 +379,47 @@ impl Stream for FilterExecStream {
374379
mut self: Pin<&mut Self>,
375380
cx: &mut Context<'_>,
376381
) -> Poll<Option<Self::Item>> {
382+
if self.done {
383+
return Poll::Ready(None);
384+
}
377385
let poll;
378386
loop {
379387
match ready!(self.input.poll_next_unpin(cx)) {
380388
Some(Ok(batch)) => {
381-
let timer = self.baseline_metrics.elapsed_compute().timer();
389+
// clone timer so we can borrow self mutably
390+
let elapsed_compute = self.baseline_metrics.elapsed_compute().clone();
391+
let _timer = elapsed_compute.timer(); // records time on drop
382392
let filtered_batch = batch_filter(&batch, &self.predicate)?;
383-
timer.done();
384-
// skip entirely filtered batches
385-
if filtered_batch.num_rows() == 0 {
386-
continue;
387-
}
388-
poll = Poll::Ready(Some(Ok(filtered_batch)));
393+
match self.coalescer.push_batch(filtered_batch) {
394+
CoalescerState::TargetReached => {
395+
let batch = self.coalescer.finish_batch();
396+
poll = Poll::Ready(Some(batch));
397+
break;
398+
}
399+
CoalescerState::Continue => {
400+
// sill need more rows
401+
continue;
402+
}
403+
CoalescerState::LimitReached => {
404+
let batch = self.coalescer.finish_batch();
405+
poll = Poll::Ready(Some(batch));
406+
break;
407+
}
408+
};
409+
}
410+
// end of input, see if we have any remaining batches
411+
None => {
412+
self.done = true;
413+
let maybe_result = if self.coalescer.is_empty() {
414+
None
415+
} else {
416+
Some(self.coalescer.finish_batch())
417+
};
418+
poll = Poll::Ready(maybe_result);
389419
break;
390420
}
391-
value => {
392-
poll = Poll::Ready(value);
421+
err => {
422+
poll = Poll::Ready(err);
393423
break;
394424
}
395425
}
@@ -405,7 +435,7 @@ impl Stream for FilterExecStream {
405435

406436
impl RecordBatchStream for FilterExecStream {
407437
fn schema(&self) -> SchemaRef {
408-
Arc::clone(&self.schema)
438+
self.coalescer.schema()
409439
}
410440
}
411441

0 commit comments

Comments
 (0)