Skip to content

Commit 59a5ac0

Browse files
korowaccciudatu
authored andcommitted
fix: duplicate output for HashJoinExec in CollectLeft mode (apache#9757)
* fix: duplicate output for HashJoinExec in CollectLeft mode * address review comments * test fix after merging main
1 parent 2d42a2c commit 59a5ac0

File tree

3 files changed

+261
-117
lines changed

3 files changed

+261
-117
lines changed

datafusion/core/src/physical_optimizer/join_selection.rs

+18-55
Original file line numberDiff line numberDiff line change
@@ -305,11 +305,6 @@ impl PhysicalOptimizerRule for JoinSelection {
305305
/// `CollectLeft` mode is applicable. Otherwise, it will try to swap the join sides.
306306
/// When the `ignore_threshold` is false, this function will also check left
307307
/// and right sizes in bytes or rows.
308-
///
309-
/// For [`JoinType::Full`], it can not use `CollectLeft` mode and will return `None`.
310-
/// For [`JoinType::Left`] and [`JoinType::LeftAnti`], it can not run `CollectLeft`
311-
/// mode as is, but it can do so by changing the join type to [`JoinType::Right`]
312-
/// and [`JoinType::RightAnti`], respectively.
313308
fn try_collect_left(
314309
hash_join: &HashJoinExec,
315310
ignore_threshold: bool,
@@ -318,38 +313,20 @@ fn try_collect_left(
318313
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
319314
let left = hash_join.left();
320315
let right = hash_join.right();
321-
let join_type = hash_join.join_type();
322316

323-
let left_can_collect = match join_type {
324-
JoinType::Left | JoinType::Full | JoinType::LeftAnti => false,
325-
JoinType::Inner
326-
| JoinType::LeftSemi
327-
| JoinType::Right
328-
| JoinType::RightSemi
329-
| JoinType::RightAnti => {
330-
ignore_threshold
331-
|| supports_collect_by_thresholds(
332-
&**left,
333-
threshold_byte_size,
334-
threshold_num_rows,
335-
)
336-
}
337-
};
338-
let right_can_collect = match join_type {
339-
JoinType::Right | JoinType::Full | JoinType::RightAnti => false,
340-
JoinType::Inner
341-
| JoinType::RightSemi
342-
| JoinType::Left
343-
| JoinType::LeftSemi
344-
| JoinType::LeftAnti => {
345-
ignore_threshold
346-
|| supports_collect_by_thresholds(
347-
&**right,
348-
threshold_byte_size,
349-
threshold_num_rows,
350-
)
351-
}
352-
};
317+
let left_can_collect = ignore_threshold
318+
|| supports_collect_by_thresholds(
319+
&**left,
320+
threshold_byte_size,
321+
threshold_num_rows,
322+
);
323+
let right_can_collect = ignore_threshold
324+
|| supports_collect_by_thresholds(
325+
&**right,
326+
threshold_byte_size,
327+
threshold_num_rows,
328+
);
329+
353330
match (left_can_collect, right_can_collect) {
354331
(true, true) => {
355332
if should_swap_join_order(&**left, &**right)?
@@ -916,9 +893,9 @@ mod tests_statistical {
916893
}
917894

918895
#[tokio::test]
919-
async fn test_left_join_with_swap() {
896+
async fn test_left_join_no_swap() {
920897
let (big, small) = create_big_and_small();
921-
// Left out join should alway swap when the mode is PartitionMode::CollectLeft, even left side is small and right side is large
898+
922899
let join = Arc::new(
923900
HashJoinExec::try_new(
924901
Arc::clone(&small),
@@ -942,32 +919,18 @@ mod tests_statistical {
942919
.optimize(join.clone(), &ConfigOptions::new())
943920
.unwrap();
944921

945-
let swapping_projection = optimized_join
946-
.as_any()
947-
.downcast_ref::<ProjectionExec>()
948-
.expect("A proj is required to swap columns back to their original order");
949-
950-
assert_eq!(swapping_projection.expr().len(), 2);
951-
let (col, name) = &swapping_projection.expr()[0];
952-
assert_eq!(name, "small_col");
953-
assert_col_expr(col, "small_col", 1);
954-
let (col, name) = &swapping_projection.expr()[1];
955-
assert_eq!(name, "big_col");
956-
assert_col_expr(col, "big_col", 0);
957-
958-
let swapped_join = swapping_projection
959-
.input()
922+
let swapped_join = optimized_join
960923
.as_any()
961924
.downcast_ref::<HashJoinExec>()
962925
.expect("The type of the plan should not be changed");
963926

964927
assert_eq!(
965928
swapped_join.left().statistics().unwrap().total_byte_size,
966-
Precision::Inexact(2097152)
929+
Precision::Inexact(8192)
967930
);
968931
assert_eq!(
969932
swapped_join.right().statistics().unwrap().total_byte_size,
970-
Precision::Inexact(8192)
933+
Precision::Inexact(2097152)
971934
);
972935
crosscheck_plans(join.clone()).unwrap();
973936
}

0 commit comments

Comments
 (0)