@@ -44,108 +44,102 @@ use hashbrown::HashMap;
44
44
use rand:: rngs:: StdRng ;
45
45
use rand:: { Rng , SeedableRng } ;
46
46
47
- #[ cfg( test) ]
48
- #[ allow( clippy:: items_after_test_module) ]
49
- mod tests {
50
- use super :: * ;
51
-
52
- use datafusion_physical_plan:: windows:: PartitionSearchMode :: {
53
- Linear , PartiallySorted , Sorted ,
54
- } ;
47
+ use datafusion_physical_plan:: windows:: PartitionSearchMode :: {
48
+ Linear , PartiallySorted , Sorted ,
49
+ } ;
55
50
56
- #[ tokio:: test( flavor = "multi_thread" , worker_threads = 16 ) ]
57
- async fn window_bounded_window_random_comparison ( ) -> Result < ( ) > {
58
- // make_staggered_batches gives result sorted according to a, b, c
59
- // In the test cases first entry represents partition by columns
60
- // Second entry represents order by columns.
61
- // Third entry represents search mode.
62
- // In sorted mode physical plans are in the form for WindowAggExec
63
- //```
64
- // WindowAggExec
65
- // MemoryExec]
66
- // ```
67
- // and in the form for BoundedWindowAggExec
68
- // ```
69
- // BoundedWindowAggExec
70
- // MemoryExec
71
- // ```
72
- // In Linear and PartiallySorted mode physical plans are in the form for WindowAggExec
73
- //```
74
- // WindowAggExec
75
- // SortExec(required by window function)
76
- // MemoryExec]
77
- // ```
78
- // and in the form for BoundedWindowAggExec
79
- // ```
80
- // BoundedWindowAggExec
81
- // MemoryExec
82
- // ```
83
- let test_cases = vec ! [
84
- ( vec![ "a" ] , vec![ "a" ] , Sorted ) ,
85
- ( vec![ "a" ] , vec![ "b" ] , Sorted ) ,
86
- ( vec![ "a" ] , vec![ "a" , "b" ] , Sorted ) ,
87
- ( vec![ "a" ] , vec![ "b" , "c" ] , Sorted ) ,
88
- ( vec![ "a" ] , vec![ "a" , "b" , "c" ] , Sorted ) ,
89
- ( vec![ "b" ] , vec![ "a" ] , Linear ) ,
90
- ( vec![ "b" ] , vec![ "a" , "b" ] , Linear ) ,
91
- ( vec![ "b" ] , vec![ "a" , "c" ] , Linear ) ,
92
- ( vec![ "b" ] , vec![ "a" , "b" , "c" ] , Linear ) ,
93
- ( vec![ "c" ] , vec![ "a" ] , Linear ) ,
94
- ( vec![ "c" ] , vec![ "a" , "b" ] , Linear ) ,
95
- ( vec![ "c" ] , vec![ "a" , "c" ] , Linear ) ,
96
- ( vec![ "c" ] , vec![ "a" , "b" , "c" ] , Linear ) ,
97
- ( vec![ "b" , "a" ] , vec![ "a" ] , Sorted ) ,
98
- ( vec![ "b" , "a" ] , vec![ "b" ] , Sorted ) ,
99
- ( vec![ "b" , "a" ] , vec![ "c" ] , Sorted ) ,
100
- ( vec![ "b" , "a" ] , vec![ "a" , "b" ] , Sorted ) ,
101
- ( vec![ "b" , "a" ] , vec![ "b" , "c" ] , Sorted ) ,
102
- ( vec![ "b" , "a" ] , vec![ "a" , "c" ] , Sorted ) ,
103
- ( vec![ "b" , "a" ] , vec![ "a" , "b" , "c" ] , Sorted ) ,
104
- ( vec![ "c" , "b" ] , vec![ "a" ] , Linear ) ,
105
- ( vec![ "c" , "b" ] , vec![ "a" , "b" ] , Linear ) ,
106
- ( vec![ "c" , "b" ] , vec![ "a" , "c" ] , Linear ) ,
107
- ( vec![ "c" , "b" ] , vec![ "a" , "b" , "c" ] , Linear ) ,
108
- ( vec![ "c" , "a" ] , vec![ "a" ] , PartiallySorted ( vec![ 1 ] ) ) ,
109
- ( vec![ "c" , "a" ] , vec![ "b" ] , PartiallySorted ( vec![ 1 ] ) ) ,
110
- ( vec![ "c" , "a" ] , vec![ "c" ] , PartiallySorted ( vec![ 1 ] ) ) ,
111
- ( vec![ "c" , "a" ] , vec![ "a" , "b" ] , PartiallySorted ( vec![ 1 ] ) ) ,
112
- ( vec![ "c" , "a" ] , vec![ "b" , "c" ] , PartiallySorted ( vec![ 1 ] ) ) ,
113
- ( vec![ "c" , "a" ] , vec![ "a" , "c" ] , PartiallySorted ( vec![ 1 ] ) ) ,
114
- (
115
- vec![ "c" , "a" ] ,
116
- vec![ "a" , "b" , "c" ] ,
117
- PartiallySorted ( vec![ 1 ] ) ,
118
- ) ,
119
- ( vec![ "c" , "b" , "a" ] , vec![ "a" ] , Sorted ) ,
120
- ( vec![ "c" , "b" , "a" ] , vec![ "b" ] , Sorted ) ,
121
- ( vec![ "c" , "b" , "a" ] , vec![ "c" ] , Sorted ) ,
122
- ( vec![ "c" , "b" , "a" ] , vec![ "a" , "b" ] , Sorted ) ,
123
- ( vec![ "c" , "b" , "a" ] , vec![ "b" , "c" ] , Sorted ) ,
124
- ( vec![ "c" , "b" , "a" ] , vec![ "a" , "c" ] , Sorted ) ,
125
- ( vec![ "c" , "b" , "a" ] , vec![ "a" , "b" , "c" ] , Sorted ) ,
126
- ] ;
127
- let n = 300 ;
128
- let n_distincts = vec ! [ 10 , 20 ] ;
129
- for n_distinct in n_distincts {
130
- let mut handles = Vec :: new ( ) ;
131
- for i in 0 ..n {
132
- let idx = i % test_cases. len ( ) ;
133
- let ( pb_cols, ob_cols, search_mode) = test_cases[ idx] . clone ( ) ;
134
- let job = tokio:: spawn ( run_window_test (
135
- make_staggered_batches :: < true > ( 1000 , n_distinct, i as u64 ) ,
136
- i as u64 ,
137
- pb_cols,
138
- ob_cols,
139
- search_mode,
140
- ) ) ;
141
- handles. push ( job) ;
142
- }
143
- for job in handles {
144
- job. await . unwrap ( ) ?;
145
- }
51
+ #[ tokio:: test( flavor = "multi_thread" , worker_threads = 16 ) ]
52
+ async fn window_bounded_window_random_comparison ( ) -> Result < ( ) > {
53
+ // make_staggered_batches gives result sorted according to a, b, c
54
+ // In the test cases first entry represents partition by columns
55
+ // Second entry represents order by columns.
56
+ // Third entry represents search mode.
57
+ // In sorted mode physical plans are in the form for WindowAggExec
58
+ //```
59
+ // WindowAggExec
60
+ // MemoryExec]
61
+ // ```
62
+ // and in the form for BoundedWindowAggExec
63
+ // ```
64
+ // BoundedWindowAggExec
65
+ // MemoryExec
66
+ // ```
67
+ // In Linear and PartiallySorted mode physical plans are in the form for WindowAggExec
68
+ //```
69
+ // WindowAggExec
70
+ // SortExec(required by window function)
71
+ // MemoryExec]
72
+ // ```
73
+ // and in the form for BoundedWindowAggExec
74
+ // ```
75
+ // BoundedWindowAggExec
76
+ // MemoryExec
77
+ // ```
78
+ let test_cases = vec ! [
79
+ ( vec![ "a" ] , vec![ "a" ] , Sorted ) ,
80
+ ( vec![ "a" ] , vec![ "b" ] , Sorted ) ,
81
+ ( vec![ "a" ] , vec![ "a" , "b" ] , Sorted ) ,
82
+ ( vec![ "a" ] , vec![ "b" , "c" ] , Sorted ) ,
83
+ ( vec![ "a" ] , vec![ "a" , "b" , "c" ] , Sorted ) ,
84
+ ( vec![ "b" ] , vec![ "a" ] , Linear ) ,
85
+ ( vec![ "b" ] , vec![ "a" , "b" ] , Linear ) ,
86
+ ( vec![ "b" ] , vec![ "a" , "c" ] , Linear ) ,
87
+ ( vec![ "b" ] , vec![ "a" , "b" , "c" ] , Linear ) ,
88
+ ( vec![ "c" ] , vec![ "a" ] , Linear ) ,
89
+ ( vec![ "c" ] , vec![ "a" , "b" ] , Linear ) ,
90
+ ( vec![ "c" ] , vec![ "a" , "c" ] , Linear ) ,
91
+ ( vec![ "c" ] , vec![ "a" , "b" , "c" ] , Linear ) ,
92
+ ( vec![ "b" , "a" ] , vec![ "a" ] , Sorted ) ,
93
+ ( vec![ "b" , "a" ] , vec![ "b" ] , Sorted ) ,
94
+ ( vec![ "b" , "a" ] , vec![ "c" ] , Sorted ) ,
95
+ ( vec![ "b" , "a" ] , vec![ "a" , "b" ] , Sorted ) ,
96
+ ( vec![ "b" , "a" ] , vec![ "b" , "c" ] , Sorted ) ,
97
+ ( vec![ "b" , "a" ] , vec![ "a" , "c" ] , Sorted ) ,
98
+ ( vec![ "b" , "a" ] , vec![ "a" , "b" , "c" ] , Sorted ) ,
99
+ ( vec![ "c" , "b" ] , vec![ "a" ] , Linear ) ,
100
+ ( vec![ "c" , "b" ] , vec![ "a" , "b" ] , Linear ) ,
101
+ ( vec![ "c" , "b" ] , vec![ "a" , "c" ] , Linear ) ,
102
+ ( vec![ "c" , "b" ] , vec![ "a" , "b" , "c" ] , Linear ) ,
103
+ ( vec![ "c" , "a" ] , vec![ "a" ] , PartiallySorted ( vec![ 1 ] ) ) ,
104
+ ( vec![ "c" , "a" ] , vec![ "b" ] , PartiallySorted ( vec![ 1 ] ) ) ,
105
+ ( vec![ "c" , "a" ] , vec![ "c" ] , PartiallySorted ( vec![ 1 ] ) ) ,
106
+ ( vec![ "c" , "a" ] , vec![ "a" , "b" ] , PartiallySorted ( vec![ 1 ] ) ) ,
107
+ ( vec![ "c" , "a" ] , vec![ "b" , "c" ] , PartiallySorted ( vec![ 1 ] ) ) ,
108
+ ( vec![ "c" , "a" ] , vec![ "a" , "c" ] , PartiallySorted ( vec![ 1 ] ) ) ,
109
+ (
110
+ vec![ "c" , "a" ] ,
111
+ vec![ "a" , "b" , "c" ] ,
112
+ PartiallySorted ( vec![ 1 ] ) ,
113
+ ) ,
114
+ ( vec![ "c" , "b" , "a" ] , vec![ "a" ] , Sorted ) ,
115
+ ( vec![ "c" , "b" , "a" ] , vec![ "b" ] , Sorted ) ,
116
+ ( vec![ "c" , "b" , "a" ] , vec![ "c" ] , Sorted ) ,
117
+ ( vec![ "c" , "b" , "a" ] , vec![ "a" , "b" ] , Sorted ) ,
118
+ ( vec![ "c" , "b" , "a" ] , vec![ "b" , "c" ] , Sorted ) ,
119
+ ( vec![ "c" , "b" , "a" ] , vec![ "a" , "c" ] , Sorted ) ,
120
+ ( vec![ "c" , "b" , "a" ] , vec![ "a" , "b" , "c" ] , Sorted ) ,
121
+ ] ;
122
+ let n = 300 ;
123
+ let n_distincts = vec ! [ 10 , 20 ] ;
124
+ for n_distinct in n_distincts {
125
+ let mut handles = Vec :: new ( ) ;
126
+ for i in 0 ..n {
127
+ let idx = i % test_cases. len ( ) ;
128
+ let ( pb_cols, ob_cols, search_mode) = test_cases[ idx] . clone ( ) ;
129
+ let job = tokio:: spawn ( run_window_test (
130
+ make_staggered_batches :: < true > ( 1000 , n_distinct, i as u64 ) ,
131
+ i as u64 ,
132
+ pb_cols,
133
+ ob_cols,
134
+ search_mode,
135
+ ) ) ;
136
+ handles. push ( job) ;
137
+ }
138
+ for job in handles {
139
+ job. await . unwrap ( ) ?;
146
140
}
147
- Ok ( ( ) )
148
141
}
142
+ Ok ( ( ) )
149
143
}
150
144
151
145
fn get_random_function (
0 commit comments