-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Limit nested loop join record batch size #12634
base: main
Are you sure you want to change the base?
Limit nested loop join record batch size #12634
Conversation
Nested loop join creates a single output batch for each (right side) input batch. When performing an outer join the size of the output batch can be as large as number of left data rows * batch rows. If the size of the left data is large then this can produce unreasonably large output batches. Attempt to reduce the size of the output batches by only processing a subset of the input batch at a time where the output could be very large. The trade-off is that this can produce a ;arge number of very small batches instead if the left data is large but there is a highly selective filter.
Use buffering to keep the size of output batches from nested loop join around the configured batch size. Small record batches are buffered until there is enough rows available to fill a full batch at which point the small batches are combined into a single batch. Larger batches have batch sized slices taken from them until they become smaller than the configured batch size.
Add a test that the nested loop join keeps the output batches smaller than the configured batch size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mhilton would be that possible to create a unit test reproducing the problem? This will also be important to prevent regression. The repro can be on small batch size up to 5 |
Add a test that exercises the large batch size issue described in issue apache#12633. This was a code review request.
I have added |
let right_batch = self.outer_record_batch.as_ref().unwrap(); | ||
let num_rows = match (self.join_type, left_data.batch().num_rows()) { | ||
// An inner join will only produce 1 output row per input row. | ||
(JoinType::Inner, _) | (_, 0) => self.output_buffer.needed_rows(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't it possible for an Inner Join produce multiple output rows per input row? I believe the "1 output row per input row" statement only holds true for equijoins. NestedLoopJoin works on non equijoins by definition.
datafusion/datafusion/physical-plan/src/joins/nested_loop_join.rs
Lines 103 to 104 in 9b4f90a
/// NestedLoopJoinExec is build-probe join operator, whose main task is to | |
/// perform joins without any equijoin conditions in `ON` clause. |
For example, for left=[1, 2]
and right=[1, 2, 3]
with the ON
clause left<>right
, it produces [(1, 2), (2, 1), (1, 3), (2, 3)]
. The row 3
from the right side produces 2 rows.
It seems impossible to predict the number of output rows without running the join.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right of course, I'm not sure why I got it in my head that they would be 1-1. I'll fix that.
Stop assuming that an INNER join cannot produce more output rows than input. Use the same row count logic for all join types.
Which issue does this PR close?
Closes #12633.
Rationale for this change
Some joins use an excessive amount of memory due to creating very large record batches. This will reduce that memory use.
What changes are included in this PR?
From the high level there are two changes introduced by this PR.
The first is to process the probe-side input batches in smaller sizes. The processing loop only processes as many rows of the probe-side input that are likely to fit in a record batch. This is somewhat pessimistic and assumes that for each probe-side row there will be one output row per build-side row (INNER joins excepted). It is possible that this could be tuned in the future to balance processing speed with memory use. In order to make progress at least one probe-side row will be processed on each loop.
The second change is to introduce an output buffer. This is used to consolidate small record batches where the JOIN condition has low selectivity. If the join condition has a high selectivity and therefore produces large batches the output buffer breaks these into smaller batches for further processing. The output buffer will always produce one batch, even if that batch is empty.
Are these changes tested?
There is a new test that ensures the output batches from
NestedLoopJoinExec
are no bigger than the configured batch size.Existing tests are assumed to be sufficient to show that the behaviour hasn't changed.
Repeated the example from #12633 gives:
Which is a mean batch size of 7692.17 rows.
Are there any user-facing changes?
No users should not notice any bahvioural difference.