@@ -47,7 +47,7 @@ use arrow::compute::concat_batches;
47
47
use arrow:: datatypes:: { Schema , SchemaRef } ;
48
48
use arrow:: record_batch:: RecordBatch ;
49
49
use arrow:: util:: bit_util;
50
- use datafusion_common:: { exec_err , JoinSide , Result , Statistics } ;
50
+ use datafusion_common:: { exec_datafusion_err , JoinSide , Result , Statistics } ;
51
51
use datafusion_execution:: memory_pool:: { MemoryConsumer , MemoryReservation } ;
52
52
use datafusion_execution:: TaskContext ;
53
53
use datafusion_expr:: JoinType ;
@@ -562,62 +562,54 @@ fn join_left_and_right_batch(
562
562
schema : & Schema ,
563
563
visited_left_side : & SharedBitmapBuilder ,
564
564
) -> Result < RecordBatch > {
565
- let indices_result = ( 0 ..left_batch. num_rows ( ) )
565
+ let indices = ( 0 ..left_batch. num_rows ( ) )
566
566
. map ( |left_row_index| {
567
567
build_join_indices ( left_row_index, right_batch, left_batch, filter)
568
568
} )
569
- . collect :: < Result < Vec < ( UInt64Array , UInt32Array ) > > > ( ) ;
569
+ . collect :: < Result < Vec < ( UInt64Array , UInt32Array ) > > > ( )
570
+ . map_err ( |e| {
571
+ exec_datafusion_err ! (
572
+ "Fail to build join indices in NestedLoopJoinExec, error:{e}"
573
+ )
574
+ } ) ?;
570
575
571
576
let mut left_indices_builder = UInt64Builder :: new ( ) ;
572
577
let mut right_indices_builder = UInt32Builder :: new ( ) ;
573
- let left_right_indices = match indices_result {
574
- Err ( err) => {
575
- exec_err ! ( "Fail to build join indices in NestedLoopJoinExec, error:{err}" )
576
- }
577
- Ok ( indices) => {
578
- for ( left_side, right_side) in indices {
579
- left_indices_builder
580
- . append_values ( left_side. values ( ) , & vec ! [ true ; left_side. len( ) ] ) ;
581
- right_indices_builder
582
- . append_values ( right_side. values ( ) , & vec ! [ true ; right_side. len( ) ] ) ;
583
- }
584
- Ok ( (
585
- left_indices_builder. finish ( ) ,
586
- right_indices_builder. finish ( ) ,
587
- ) )
588
- }
589
- } ;
590
- match left_right_indices {
591
- Ok ( ( left_side, right_side) ) => {
592
- // set the left bitmap
593
- // and only full join need the left bitmap
594
- if need_produce_result_in_final ( join_type) {
595
- let mut bitmap = visited_left_side. lock ( ) ;
596
- left_side. iter ( ) . flatten ( ) . for_each ( |x| {
597
- bitmap. set_bit ( x as usize , true ) ;
598
- } ) ;
599
- }
600
- // adjust the two side indices base on the join type
601
- let ( left_side, right_side) = adjust_indices_by_join_type (
602
- left_side,
603
- right_side,
604
- 0 ..right_batch. num_rows ( ) ,
605
- join_type,
606
- false ,
607
- ) ;
578
+ for ( left_side, right_side) in indices {
579
+ left_indices_builder
580
+ . append_values ( left_side. values ( ) , & vec ! [ true ; left_side. len( ) ] ) ;
581
+ right_indices_builder
582
+ . append_values ( right_side. values ( ) , & vec ! [ true ; right_side. len( ) ] ) ;
583
+ }
608
584
609
- build_batch_from_indices (
610
- schema,
611
- left_batch,
612
- right_batch,
613
- & left_side,
614
- & right_side,
615
- column_indices,
616
- JoinSide :: Left ,
617
- )
618
- }
619
- Err ( e) => Err ( e) ,
585
+ let left_side = left_indices_builder. finish ( ) ;
586
+ let right_side = right_indices_builder. finish ( ) ;
587
+ // set the left bitmap
588
+ // and only full join need the left bitmap
589
+ if need_produce_result_in_final ( join_type) {
590
+ let mut bitmap = visited_left_side. lock ( ) ;
591
+ left_side. iter ( ) . flatten ( ) . for_each ( |x| {
592
+ bitmap. set_bit ( x as usize , true ) ;
593
+ } ) ;
620
594
}
595
+ // adjust the two side indices base on the join type
596
+ let ( left_side, right_side) = adjust_indices_by_join_type (
597
+ left_side,
598
+ right_side,
599
+ 0 ..right_batch. num_rows ( ) ,
600
+ join_type,
601
+ false ,
602
+ ) ;
603
+
604
+ build_batch_from_indices (
605
+ schema,
606
+ left_batch,
607
+ right_batch,
608
+ & left_side,
609
+ & right_side,
610
+ column_indices,
611
+ JoinSide :: Left ,
612
+ )
621
613
}
622
614
623
615
fn get_final_indices_from_shared_bitmap (
0 commit comments