15
15
// specific language governing permissions and limitations
16
16
// under the License.
17
17
18
- use arrow_schema:: SortOptions ;
18
+ use arrow:: array:: { Int32Array , RecordBatch } ;
19
+ use arrow_schema:: { DataType , Field , Schema , SortOptions } ;
19
20
use criterion:: { criterion_group, criterion_main, Criterion } ;
20
21
use datafusion_common:: JoinType :: Inner ;
22
+ use datafusion_datasource:: memory:: MemorySourceConfig ;
21
23
use datafusion_execution:: config:: SessionConfig ;
22
24
use datafusion_execution:: disk_manager:: DiskManagerConfig ;
23
25
use datafusion_execution:: runtime_env:: RuntimeEnvBuilder ;
@@ -29,88 +31,78 @@ use datafusion_physical_plan::ExecutionPlan;
29
31
use std:: sync:: Arc ;
30
32
use tokio:: runtime:: Runtime ;
31
33
32
- fn create_test_data ( ) -> SortMergeJoinExec {
33
- let left_batch = build_table_i32 (
34
- ( "a1" , & vec ! [ 0 , 1 , 2 , 3 , 4 , 5 ] ) ,
35
- ( "b1" , & vec ! [ 1 , 2 , 3 , 4 , 5 , 6 ] ) ,
36
- ( "c1" , & vec ! [ 4 , 5 , 6 , 7 , 8 , 9 ] ) ,
37
- ) ;
38
- let left_schema = left_batch. schema ( ) ;
39
- let left =
40
- TestMemoryExec :: try_new_exec ( & [ vec ! [ left_batch] ] , left_schema, None ) . unwrap ( ) ;
41
- let right_batch = build_table_i32 (
42
- ( "a2" , & vec ! [ 0 , 10 , 20 , 30 , 40 ] ) ,
43
- ( "b2" , & vec ! [ 1 , 3 , 4 , 6 , 8 ] ) ,
44
- ( "c2" , & vec ! [ 50 , 60 , 70 , 80 , 90 ] ) ,
45
- ) ;
46
- let right_schema = right_batch. schema ( ) ;
47
- let right =
48
- TestMemoryExec :: try_new_exec ( & [ vec ! [ right_batch] ] , right_schema, None ) . unwrap ( ) ;
34
+ fn create_smj_exec ( array_len : usize , batch_size : usize ) -> SortMergeJoinExec {
35
+ // define a schema.
36
+ let schema = Arc :: new ( Schema :: new ( vec ! [
37
+ Field :: new( "a1" , DataType :: Int32 , false ) ,
38
+ Field :: new( "b1" , DataType :: Int32 , false ) ,
39
+ Field :: new( "c1" , DataType :: Int32 , false ) ,
40
+ ] ) ) ;
41
+ // define data.
42
+ let batches = ( 0 ..array_len / batch_size)
43
+ . map ( |i| {
44
+ RecordBatch :: try_new (
45
+ Arc :: clone ( & schema) ,
46
+ vec ! [
47
+ Arc :: new( Int32Array :: from( vec![ i as i32 ; batch_size] ) ) ,
48
+ Arc :: new( Int32Array :: from( vec![ i as i32 ; batch_size] ) ) ,
49
+ Arc :: new( Int32Array :: from( vec![ i as i32 ; batch_size] ) ) ,
50
+ ] ,
51
+ )
52
+ . unwrap ( )
53
+ } )
54
+ . collect :: < Vec < _ > > ( ) ;
55
+ let datasource_exec =
56
+ MemorySourceConfig :: try_new_exec ( & vec ! [ batches] , Arc :: clone ( & schema) , None )
57
+ . unwrap ( ) ;
58
+
49
59
let on = vec ! [ (
50
- Arc :: new( Column :: new_with_schema( "b1" , & left . schema( ) ) . unwrap( ) ) as _,
51
- Arc :: new( Column :: new_with_schema( "b2 " , & right . schema( ) ) . unwrap( ) ) as _,
60
+ Arc :: new( Column :: new_with_schema( "b1" , & schema) . unwrap( ) ) as _,
61
+ Arc :: new( Column :: new_with_schema( "b1 " , & schema) . unwrap( ) ) as _,
52
62
) ] ;
53
63
let sort_options = vec ! [ SortOptions :: default ( ) ; on. len( ) ] ;
54
-
55
- SortMergeJoinExec :: try_new ( left, right, on, None , Inner , sort_options, false ) . unwrap ( )
64
+ SortMergeJoinExec :: try_new (
65
+ datasource_exec. clone ( ) ,
66
+ datasource_exec. clone ( ) ,
67
+ on,
68
+ None ,
69
+ Inner ,
70
+ sort_options,
71
+ false ,
72
+ )
73
+ . unwrap ( )
56
74
}
57
75
58
76
// `cargo bench --bench sort_merge_join`
59
77
fn bench_spill ( c : & mut Criterion ) {
60
- let sort_merge_join_exec = create_test_data ( ) ;
61
-
62
- let mut group = c. benchmark_group ( "sort_merge_join_spill" ) ;
63
78
let rt = Runtime :: new ( ) . unwrap ( ) ;
79
+ c. bench_function ( "SortMergeJoinExec_spill" , |b| {
80
+ let join_exec = create_smj_exec ( 1_048_576 , 4096 ) ;
64
81
65
- let runtime = RuntimeEnvBuilder :: new ( )
66
- . with_memory_limit ( 100 , 1.0 ) // Set memory limit to 100 bytes
67
- . with_disk_manager ( DiskManagerConfig :: NewOs ) // Enable DiskManager to allow spilling
68
- . build_arc ( )
69
- . unwrap ( ) ;
70
- let session_config = SessionConfig :: default ( ) ;
71
- let task_ctx = Arc :: new (
72
- TaskContext :: default ( )
73
- . with_session_config ( session_config . clone ( ) )
74
- . with_runtime ( Arc :: clone ( & runtime ) ) ,
75
- ) ;
82
+ // create a session context. enable spilling
83
+ let runtime_env = RuntimeEnvBuilder :: new ( )
84
+ . with_memory_limit ( 1024 * 1024 , 1.0 ) // Set memory limit to 1MB
85
+ . with_disk_manager ( DiskManagerConfig :: NewOs ) // Enable DiskManager to allow spilling
86
+ . build_arc ( )
87
+ . unwrap ( ) ;
88
+ let task_ctx = Arc :: new (
89
+ TaskContext :: default ( )
90
+ . with_session_config ( SessionConfig :: new ( ) )
91
+ . with_runtime ( Arc :: clone ( & runtime_env ) ) ,
92
+ ) ;
76
93
77
- group. bench_function ( "SortMergeJoinExec_spill" , |b| {
78
94
b. iter ( || {
79
- criterion:: black_box ( rt. block_on ( async {
80
- let stream = sort_merge_join_exec
81
- . execute ( 0 , Arc :: clone ( & task_ctx) )
82
- . unwrap ( ) ;
83
- collect ( stream) . await . unwrap ( )
84
- } ) )
85
- } )
95
+ let stream = join_exec. execute ( 0 , Arc :: clone ( & task_ctx) ) . unwrap ( ) ;
96
+ criterion:: black_box ( rt. block_on ( collect ( stream) ) ) . unwrap ( ) ;
97
+ } ) ;
98
+ // check if spilling happened
99
+ assert ! ( join_exec. metrics( ) . unwrap( ) . spilled_rows( ) . unwrap( ) > 0 ) ;
86
100
} ) ;
87
- group. finish ( ) ;
88
-
89
- assert ! (
90
- sort_merge_join_exec
91
- . metrics( )
92
- . unwrap( )
93
- . spill_count( )
94
- . unwrap( )
95
- > 0
96
- ) ;
97
- assert ! (
98
- sort_merge_join_exec
99
- . metrics( )
100
- . unwrap( )
101
- . spilled_bytes( )
102
- . unwrap( )
103
- > 0
104
- ) ;
105
- assert ! (
106
- sort_merge_join_exec
107
- . metrics( )
108
- . unwrap( )
109
- . spilled_rows( )
110
- . unwrap( )
111
- > 0
112
- ) ;
113
101
}
114
102
115
- criterion_group ! ( benches, bench_spill) ;
103
+ criterion_group ! (
104
+ name = benches;
105
+ config = Criterion :: default ( ) . sample_size( 10 ) ;
106
+ targets = bench_spill
107
+ ) ;
116
108
criterion_main ! ( benches) ;
0 commit comments