Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit nested loop join record batch size #12634

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

mhilton
Copy link
Contributor

@mhilton mhilton commented Sep 26, 2024

Which issue does this PR close?

Closes #12633.

Rationale for this change

Some joins use an excessive amount of memory due to creating very large record batches. This will reduce that memory use.

What changes are included in this PR?

From the high level there are two changes introduced by this PR.

The first is to process the probe-side input batches in smaller sizes. The processing loop only processes as many rows of the probe-side input that are likely to fit in a record batch. This is somewhat pessimistic and assumes that for each probe-side row there will be one output row per build-side row (INNER joins excepted). It is possible that this could be tuned in the future to balance processing speed with memory use. In order to make progress at least one probe-side row will be processed on each loop.

The second change is to introduce an output buffer. This is used to consolidate small record batches where the JOIN condition has low selectivity. If the join condition has a high selectivity and therefore produces large batches the output buffer breaks these into smaller batches for further processing. The output buffer will always produce one batch, even if that batch is empty.

Are these changes tested?

There is a new test that ensures the output batches from NestedLoopJoinExec are no bigger than the configured batch size.

Existing tests are assumed to be sufficient to show that the behaviour hasn't changed.

Repeated the example from #12633 gives:

> SHOW datafusion.execution.batch_size;
+---------------------------------+-------+
| name                            | value |
+---------------------------------+-------+
| datafusion.execution.batch_size | 8192  |
+---------------------------------+-------+
1 row(s) fetched. 
Elapsed 0.039 seconds.

> CREATE TABLE test AS VALUES (0), (1), (2), (3), (4), (5), (6), (7), (8), (9);
0 row(s) fetched. 
Elapsed 0.010 seconds.

> EXPLAIN ANALYZE WITH test_t AS (SELECT concat(t1.column1, t2.column1, t3.column1, t4.column1, t5.column1) AS v FROM test t1, test t2, test t3, test t4, test t5) SELECT * FROM test_t tt1 FULL OUTER JOIN test_t tt2 ON tt1.v<>tt2.v;
+-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type         | plan                                                                                                                                                                                                                                                                                  |
+-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | NestedLoopJoinExec: join_type=Full, filter=v@0 != v@1, metrics=[output_rows=9999900000, build_input_batches=10000, build_input_rows=100000, input_batches=10000, input_rows=100000, output_batches=1300010, build_mem_used=2492500, build_time=170.59826ms, join_time=309.369402772s] |
|                   |   CoalescePartitionsExec, metrics=[output_rows=100000, elapsed_compute=30.75µs]                                                                                                                                                                                                       |
|                   |     ProjectionExec: expr=[concat(CAST(column1@1 AS Utf8), CAST(column1@2 AS Utf8), CAST(column1@3 AS Utf8), CAST(column1@4 AS Utf8), CAST(column1@0 AS Utf8)) as v], metrics=[output_rows=100000, elapsed_compute=67.949286ms]                                                        |
|                   |       CrossJoinExec, metrics=[output_rows=100000, build_input_batches=1, build_input_rows=10, input_batches=1000, input_rows=10000, output_batches=10000, build_mem_used=224, build_time=139.458µs, join_time=8.338651ms]                                                             |
|                   |         MemoryExec: partitions=1, partition_sizes=[1], metrics=[]                                                                                                                                                                                                                     |
|                   |         RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, metrics=[fetch_time=1.661829ms, repartition_time=1ns, send_time=10.136821ms]                                                                                                                           |
|                   |           ProjectionExec: expr=[column1@1 as column1, column1@2 as column1, column1@3 as column1, column1@0 as column1], metrics=[output_rows=10000, elapsed_compute=348.255µs]                                                                                                       |
|                   |             CrossJoinExec, metrics=[output_rows=10000, build_input_batches=1, build_input_rows=10, input_batches=100, input_rows=1000, output_batches=1000, build_mem_used=224, build_time=9.917µs, join_time=464.211µs]                                                              |
|                   |               MemoryExec: partitions=1, partition_sizes=[1], metrics=[]                                                                                                                                                                                                               |
|                   |               ProjectionExec: expr=[column1@1 as column1, column1@2 as column1, column1@0 as column1], metrics=[output_rows=1000, elapsed_compute=33.044µs]                                                                                                                           |
|                   |                 CrossJoinExec, metrics=[output_rows=1000, build_input_batches=1, build_input_rows=10, input_batches=10, input_rows=100, output_batches=100, build_mem_used=224, build_time=1.375µs, join_time=53.299µs]                                                               |
|                   |                   MemoryExec: partitions=1, partition_sizes=[1], metrics=[]                                                                                                                                                                                                           |
|                   |                   CrossJoinExec, metrics=[output_rows=100, build_input_batches=1, build_input_rows=10, input_batches=1, input_rows=10, output_batches=10, build_mem_used=224, build_time=1.083µs, join_time=244.708µs]                                                                |
|                   |                     MemoryExec: partitions=1, partition_sizes=[1], metrics=[]                                                                                                                                                                                                         |
|                   |                     MemoryExec: partitions=1, partition_sizes=[1], metrics=[]                                                                                                                                                                                                         |
|                   |   ProjectionExec: expr=[concat(CAST(column1@1 AS Utf8), CAST(column1@2 AS Utf8), CAST(column1@3 AS Utf8), CAST(column1@4 AS Utf8), CAST(column1@0 AS Utf8)) as v], metrics=[output_rows=100000, elapsed_compute=262.67843ms]                                                          |
|                   |     CrossJoinExec, metrics=[output_rows=100000, build_input_batches=1, build_input_rows=10, input_batches=1000, input_rows=10000, output_batches=10000, build_mem_used=224, build_time=5.916µs, join_time=60.39301ms]                                                                 |
|                   |       MemoryExec: partitions=1, partition_sizes=[1], metrics=[]                                                                                                                                                                                                                       |
|                   |       RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, metrics=[fetch_time=1.857489ms, repartition_time=1ns, send_time=31.12258491s]                                                                                                                            |
|                   |         ProjectionExec: expr=[column1@1 as column1, column1@2 as column1, column1@3 as column1, column1@0 as column1], metrics=[output_rows=10000, elapsed_compute=408.628µs]                                                                                                         |
|                   |           CrossJoinExec, metrics=[output_rows=10000, build_input_batches=1, build_input_rows=10, input_batches=100, input_rows=1000, output_batches=1000, build_mem_used=224, build_time=792ns, join_time=926.525µs]                                                                  |
|                   |             MemoryExec: partitions=1, partition_sizes=[1], metrics=[]                                                                                                                                                                                                                 |
|                   |             ProjectionExec: expr=[column1@1 as column1, column1@2 as column1, column1@0 as column1], metrics=[output_rows=1000, elapsed_compute=44.416µs]                                                                                                                             |
|                   |               CrossJoinExec, metrics=[output_rows=1000, build_input_batches=1, build_input_rows=10, input_batches=10, input_rows=100, output_batches=100, build_mem_used=224, build_time=416ns, join_time=95.039µs]                                                                   |
|                   |                 MemoryExec: partitions=1, partition_sizes=[1], metrics=[]                                                                                                                                                                                                             |
|                   |                 CrossJoinExec, metrics=[output_rows=100, build_input_batches=1, build_input_rows=10, input_batches=1, input_rows=10, output_batches=10, build_mem_used=224, build_time=417ns, join_time=4.499µs]                                                                      |
|                   |                   MemoryExec: partitions=1, partition_sizes=[1], metrics=[]                                                                                                                                                                                                           |
|                   |                   MemoryExec: partitions=1, partition_sizes=[1], metrics=[]                                                                                                                                                                                                           |
|                   |                                                                                                                                                                                                                                                                                       |
+-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row(s) fetched. 
Elapsed 32.145 seconds.

Which is a mean batch size of 7692.17 rows.

Are there any user-facing changes?

No users should not notice any bahvioural difference.

Nested loop join creates a single output batch for each (right side)
input batch. When performing an outer join the size of the output
batch can be as large as number of left data rows * batch rows. If
the size of the left data is large then this can produce unreasonably
large output batches. Attempt to reduce the size of the output
batches by only processing a subset of the input batch at a time
where the output could be very large. The trade-off is that this
can produce a ;arge number of very small batches instead if the
left data is large but there is a highly selective filter.
Use buffering to keep the size of output batches from nested loop
join around the configured batch size. Small record batches are
buffered until there is enough rows available to fill a full batch
at which point the small batches are combined into a single batch.
Larger batches have batch sized slices taken from them until they
become smaller than the configured batch size.
Add a test that the nested loop join keeps the output batches smaller
than the configured batch size.
@github-actions github-actions bot added physical-expr Physical Expressions core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) labels Sep 26, 2024
Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mhilton I'm planning to review it this week
@korowa FYI

@comphead
Copy link
Contributor

@mhilton would be that possible to create a unit test reproducing the problem? This will also be important to prevent regression. The repro can be on small batch size up to 5

Add a test that exercises the large batch size issue described in
issue apache#12633. This was a code review request.
@mhilton
Copy link
Contributor Author

mhilton commented Sep 27, 2024

@mhilton would be that possible to create a unit test reproducing the problem? This will also be important to prevent regression. The repro can be on small batch size up to 5

I have added test_issue_12633 which covers this. I've checked that it fails without the changes from this PR.

let right_batch = self.outer_record_batch.as_ref().unwrap();
let num_rows = match (self.join_type, left_data.batch().num_rows()) {
// An inner join will only produce 1 output row per input row.
(JoinType::Inner, _) | (_, 0) => self.output_buffer.needed_rows(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it possible for an Inner Join produce multiple output rows per input row? I believe the "1 output row per input row" statement only holds true for equijoins. NestedLoopJoin works on non equijoins by definition.

/// NestedLoopJoinExec is build-probe join operator, whose main task is to
/// perform joins without any equijoin conditions in `ON` clause.

For example, for left=[1, 2] and right=[1, 2, 3] with the ON clause left<>right, it produces [(1, 2), (2, 1), (1, 3), (2, 3)]. The row 3 from the right side produces 2 rows.

It seems impossible to predict the number of output rows without running the join.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right of course, I'm not sure why I got it in my head that they would be 1-1. I'll fix that.

Stop assuming that an INNER join cannot produce more output rows
than input. Use the same row count logic for all join types.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate physical-expr Physical Expressions sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

NestedLoopJoinExec can create excessively large record batches
3 participants