18
18
//! Benchmarks of benchmark for extracting arrow statistics from parquet
19
19
20
20
use arrow:: array:: { ArrayRef , DictionaryArray , Float64Array , StringArray , UInt64Array } ;
21
- use arrow_array:: { Int32Array , RecordBatch } ;
21
+ use arrow_array:: { Int32Array , Int64Array , RecordBatch } ;
22
22
use arrow_schema:: {
23
23
DataType :: { self , * } ,
24
24
Field , Schema ,
25
25
} ;
26
26
use criterion:: { criterion_group, criterion_main, BenchmarkId , Criterion } ;
27
27
use datafusion:: datasource:: physical_plan:: parquet:: StatisticsConverter ;
28
- use parquet:: arrow:: { arrow_reader:: ArrowReaderBuilder , ArrowWriter } ;
29
- use parquet:: file:: properties:: WriterProperties ;
28
+ use parquet:: {
29
+ arrow:: arrow_reader:: ArrowReaderOptions , file:: properties:: WriterProperties ,
30
+ } ;
31
+ use parquet:: {
32
+ arrow:: { arrow_reader:: ArrowReaderBuilder , ArrowWriter } ,
33
+ file:: properties:: EnabledStatistics ,
34
+ } ;
30
35
use std:: sync:: Arc ;
31
36
use tempfile:: NamedTempFile ;
32
37
#[ derive( Debug , Clone ) ]
33
38
enum TestTypes {
34
39
UInt64 ,
40
+ Int64 ,
35
41
F64 ,
36
42
String ,
37
43
Dictionary ,
@@ -43,18 +49,26 @@ impl fmt::Display for TestTypes {
43
49
fn fmt ( & self , f : & mut fmt:: Formatter ) -> fmt:: Result {
44
50
match self {
45
51
TestTypes :: UInt64 => write ! ( f, "UInt64" ) ,
52
+ TestTypes :: Int64 => write ! ( f, "Int64" ) ,
46
53
TestTypes :: F64 => write ! ( f, "F64" ) ,
47
54
TestTypes :: String => write ! ( f, "String" ) ,
48
55
TestTypes :: Dictionary => write ! ( f, "Dictionary(Int32, String)" ) ,
49
56
}
50
57
}
51
58
}
52
59
53
- fn create_parquet_file ( dtype : TestTypes , row_groups : usize ) -> NamedTempFile {
60
+ fn create_parquet_file (
61
+ dtype : TestTypes ,
62
+ row_groups : usize ,
63
+ data_page_row_count_limit : & Option < usize > ,
64
+ ) -> NamedTempFile {
54
65
let schema = match dtype {
55
66
TestTypes :: UInt64 => {
56
67
Arc :: new ( Schema :: new ( vec ! [ Field :: new( "col" , DataType :: UInt64 , true ) ] ) )
57
68
}
69
+ TestTypes :: Int64 => {
70
+ Arc :: new ( Schema :: new ( vec ! [ Field :: new( "col" , DataType :: Int64 , true ) ] ) )
71
+ }
58
72
TestTypes :: F64 => Arc :: new ( Schema :: new ( vec ! [ Field :: new(
59
73
"col" ,
60
74
DataType :: Float64 ,
@@ -70,7 +84,14 @@ fn create_parquet_file(dtype: TestTypes, row_groups: usize) -> NamedTempFile {
70
84
) ] ) ) ,
71
85
} ;
72
86
73
- let props = WriterProperties :: builder ( ) . build ( ) ;
87
+ let mut props = WriterProperties :: builder ( ) . set_max_row_group_size ( row_groups) ;
88
+ if let Some ( limit) = data_page_row_count_limit {
89
+ props = props
90
+ . set_data_page_row_count_limit ( * limit)
91
+ . set_statistics_enabled ( EnabledStatistics :: Page ) ;
92
+ } ;
93
+ let props = props. build ( ) ;
94
+
74
95
let file = tempfile:: Builder :: new ( )
75
96
. suffix ( ".parquet" )
76
97
. tempfile ( )
@@ -82,11 +103,21 @@ fn create_parquet_file(dtype: TestTypes, row_groups: usize) -> NamedTempFile {
82
103
for _ in 0 ..row_groups {
83
104
let batch = match dtype {
84
105
TestTypes :: UInt64 => make_uint64_batch ( ) ,
106
+ TestTypes :: Int64 => make_int64_batch ( ) ,
85
107
TestTypes :: F64 => make_f64_batch ( ) ,
86
108
TestTypes :: String => make_string_batch ( ) ,
87
109
TestTypes :: Dictionary => make_dict_batch ( ) ,
88
110
} ;
89
- writer. write ( & batch) . unwrap ( ) ;
111
+ if data_page_row_count_limit. is_some ( ) {
112
+ // Send batches one at a time. This allows the
113
+ // writer to apply the page limit, that is only
114
+ // checked on RecordBatch boundaries.
115
+ for i in 0 ..batch. num_rows ( ) {
116
+ writer. write ( & batch. slice ( i, 1 ) ) . unwrap ( ) ;
117
+ }
118
+ } else {
119
+ writer. write ( & batch) . unwrap ( ) ;
120
+ }
90
121
}
91
122
writer. close ( ) . unwrap ( ) ;
92
123
file
@@ -109,6 +140,23 @@ fn make_uint64_batch() -> RecordBatch {
109
140
. unwrap ( )
110
141
}
111
142
143
+ fn make_int64_batch ( ) -> RecordBatch {
144
+ let array: ArrayRef = Arc :: new ( Int64Array :: from ( vec ! [
145
+ Some ( 1 ) ,
146
+ Some ( 2 ) ,
147
+ Some ( 3 ) ,
148
+ Some ( 4 ) ,
149
+ Some ( 5 ) ,
150
+ ] ) ) ;
151
+ RecordBatch :: try_new (
152
+ Arc :: new ( arrow:: datatypes:: Schema :: new ( vec ! [
153
+ arrow:: datatypes:: Field :: new( "col" , Int64 , false ) ,
154
+ ] ) ) ,
155
+ vec ! [ array] ,
156
+ )
157
+ . unwrap ( )
158
+ }
159
+
112
160
fn make_f64_batch ( ) -> RecordBatch {
113
161
let array: ArrayRef = Arc :: new ( Float64Array :: from ( vec ! [ 1.0 , 2.0 , 3.0 , 4.0 , 5.0 ] ) ) ;
114
162
RecordBatch :: try_new (
@@ -150,36 +198,88 @@ fn make_dict_batch() -> RecordBatch {
150
198
fn criterion_benchmark ( c : & mut Criterion ) {
151
199
let row_groups = 100 ;
152
200
use TestTypes :: * ;
153
- let types = vec ! [ UInt64 , F64 , String , Dictionary ] ;
201
+ let types = vec ! [ Int64 , UInt64 , F64 , String , Dictionary ] ;
202
+ let data_page_row_count_limits = vec ! [ None , Some ( 1 ) ] ;
154
203
155
204
for dtype in types {
156
- let file = create_parquet_file ( dtype. clone ( ) , row_groups) ;
157
- let file = file. reopen ( ) . unwrap ( ) ;
158
- let reader = ArrowReaderBuilder :: try_new ( file) . unwrap ( ) ;
159
- let metadata = reader. metadata ( ) ;
160
- let row_groups = metadata. row_groups ( ) ;
161
-
162
- let mut group =
163
- c. benchmark_group ( format ! ( "Extract statistics for {}" , dtype. clone( ) ) ) ;
164
- group. bench_function (
165
- BenchmarkId :: new ( "extract_statistics" , dtype. clone ( ) ) ,
166
- |b| {
167
- b. iter ( || {
168
- let converter = StatisticsConverter :: try_new (
169
- "col" ,
170
- reader. schema ( ) ,
171
- reader. parquet_schema ( ) ,
172
- )
173
- . unwrap ( ) ;
174
-
175
- let _ = converter. row_group_mins ( row_groups. iter ( ) ) . unwrap ( ) ;
176
- let _ = converter. row_group_maxes ( row_groups. iter ( ) ) . unwrap ( ) ;
177
- let _ = converter. row_group_null_counts ( row_groups. iter ( ) ) . unwrap ( ) ;
178
- let _ = converter. row_group_row_counts ( row_groups. iter ( ) ) . unwrap ( ) ;
179
- } )
180
- } ,
181
- ) ;
182
- group. finish ( ) ;
205
+ for data_page_row_count_limit in & data_page_row_count_limits {
206
+ let file =
207
+ create_parquet_file ( dtype. clone ( ) , row_groups, data_page_row_count_limit) ;
208
+ let file = file. reopen ( ) . unwrap ( ) ;
209
+ let options = ArrowReaderOptions :: new ( ) . with_page_index ( true ) ;
210
+ let reader = ArrowReaderBuilder :: try_new_with_options ( file, options) . unwrap ( ) ;
211
+ let metadata = reader. metadata ( ) ;
212
+ let row_groups = metadata. row_groups ( ) ;
213
+ let row_group_indices: Vec < _ > = ( 0 ..row_groups. len ( ) ) . collect ( ) ;
214
+
215
+ let statistic_type = if data_page_row_count_limit. is_some ( ) {
216
+ "data page"
217
+ } else {
218
+ "row group"
219
+ } ;
220
+
221
+ let mut group = c. benchmark_group ( format ! (
222
+ "Extract {} statistics for {}" ,
223
+ statistic_type,
224
+ dtype. clone( )
225
+ ) ) ;
226
+ group. bench_function (
227
+ BenchmarkId :: new ( "extract_statistics" , dtype. clone ( ) ) ,
228
+ |b| {
229
+ b. iter ( || {
230
+ let converter = StatisticsConverter :: try_new (
231
+ "col" ,
232
+ reader. schema ( ) ,
233
+ reader. parquet_schema ( ) ,
234
+ )
235
+ . unwrap ( ) ;
236
+
237
+ if data_page_row_count_limit. is_some ( ) {
238
+ let column_page_index = reader
239
+ . metadata ( )
240
+ . column_index ( )
241
+ . expect ( "File should have column page indices" ) ;
242
+
243
+ let column_offset_index = reader
244
+ . metadata ( )
245
+ . offset_index ( )
246
+ . expect ( "File should have column offset indices" ) ;
247
+
248
+ let _ = converter. data_page_mins (
249
+ column_page_index,
250
+ column_offset_index,
251
+ & row_group_indices,
252
+ ) ;
253
+ let _ = converter. data_page_maxes (
254
+ column_page_index,
255
+ column_offset_index,
256
+ & row_group_indices,
257
+ ) ;
258
+ let _ = converter. data_page_null_counts (
259
+ column_page_index,
260
+ column_offset_index,
261
+ & row_group_indices,
262
+ ) ;
263
+ let _ = converter. data_page_row_counts (
264
+ column_offset_index,
265
+ row_groups,
266
+ & row_group_indices,
267
+ ) ;
268
+ } else {
269
+ let _ = converter. row_group_mins ( row_groups. iter ( ) ) . unwrap ( ) ;
270
+ let _ = converter. row_group_maxes ( row_groups. iter ( ) ) . unwrap ( ) ;
271
+ let _ = converter
272
+ . row_group_null_counts ( row_groups. iter ( ) )
273
+ . unwrap ( ) ;
274
+ let _ = converter
275
+ . row_group_row_counts ( row_groups. iter ( ) )
276
+ . unwrap ( ) ;
277
+ }
278
+ } )
279
+ } ,
280
+ ) ;
281
+ group. finish ( ) ;
282
+ }
183
283
}
184
284
}
185
285
0 commit comments