@@ -33,7 +33,7 @@ use crate::data_type::private::ParquetValueType;
33
33
use crate :: data_type:: * ;
34
34
use crate :: encodings:: levels:: LevelEncoder ;
35
35
use crate :: errors:: { ParquetError , Result } ;
36
- use crate :: file:: metadata:: { ColumnIndexBuilder , OffsetIndexBuilder } ;
36
+ use crate :: file:: metadata:: { ColumnIndexBuilder , LevelHistogram , OffsetIndexBuilder } ;
37
37
use crate :: file:: properties:: EnabledStatistics ;
38
38
use crate :: file:: statistics:: { Statistics , ValueStatistics } ;
39
39
use crate :: file:: {
@@ -189,6 +189,54 @@ struct PageMetrics {
189
189
num_buffered_values : u32 ,
190
190
num_buffered_rows : u32 ,
191
191
num_page_nulls : u64 ,
192
+ repetition_level_histogram : Option < LevelHistogram > ,
193
+ definition_level_histogram : Option < LevelHistogram > ,
194
+ }
195
+
196
+ impl PageMetrics {
197
+ fn new ( ) -> Self {
198
+ Default :: default ( )
199
+ }
200
+
201
+ /// Initialize the repetition level histogram
202
+ fn with_repetition_level_histogram ( mut self , max_level : i16 ) -> Self {
203
+ self . repetition_level_histogram = LevelHistogram :: try_new ( max_level) ;
204
+ self
205
+ }
206
+
207
+ /// Initialize the definition level histogram
208
+ fn with_definition_level_histogram ( mut self , max_level : i16 ) -> Self {
209
+ self . definition_level_histogram = LevelHistogram :: try_new ( max_level) ;
210
+ self
211
+ }
212
+
213
+ /// Resets the state of this `PageMetrics` to the initial state.
214
+ /// If histograms have been initialized their contents will be reset to zero.
215
+ fn new_page ( & mut self ) {
216
+ self . num_buffered_values = 0 ;
217
+ self . num_buffered_rows = 0 ;
218
+ self . num_page_nulls = 0 ;
219
+ self . repetition_level_histogram
220
+ . as_mut ( )
221
+ . map ( LevelHistogram :: reset) ;
222
+ self . definition_level_histogram
223
+ . as_mut ( )
224
+ . map ( LevelHistogram :: reset) ;
225
+ }
226
+
227
+ /// Updates histogram values using provided repetition levels
228
+ fn update_repetition_level_histogram ( & mut self , levels : & [ i16 ] ) {
229
+ if let Some ( ref mut rep_hist) = self . repetition_level_histogram {
230
+ rep_hist. update_from_levels ( levels) ;
231
+ }
232
+ }
233
+
234
+ /// Updates histogram values using provided definition levels
235
+ fn update_definition_level_histogram ( & mut self , levels : & [ i16 ] ) {
236
+ if let Some ( ref mut def_hist) = self . definition_level_histogram {
237
+ def_hist. update_from_levels ( levels) ;
238
+ }
239
+ }
192
240
}
193
241
194
242
// Metrics per column writer
@@ -206,13 +254,50 @@ struct ColumnMetrics<T: Default> {
206
254
num_column_nulls : u64 ,
207
255
column_distinct_count : Option < u64 > ,
208
256
variable_length_bytes : Option < i64 > ,
257
+ repetition_level_histogram : Option < LevelHistogram > ,
258
+ definition_level_histogram : Option < LevelHistogram > ,
209
259
}
210
260
211
261
impl < T : Default > ColumnMetrics < T > {
212
262
fn new ( ) -> Self {
213
263
Default :: default ( )
214
264
}
215
265
266
+ /// Initialize the repetition level histogram
267
+ fn with_repetition_level_histogram ( mut self , max_level : i16 ) -> Self {
268
+ self . repetition_level_histogram = LevelHistogram :: try_new ( max_level) ;
269
+ self
270
+ }
271
+
272
+ /// Initialize the definition level histogram
273
+ fn with_definition_level_histogram ( mut self , max_level : i16 ) -> Self {
274
+ self . definition_level_histogram = LevelHistogram :: try_new ( max_level) ;
275
+ self
276
+ }
277
+
278
+ /// Sum `page_histogram` into `chunk_histogram`
279
+ fn update_histogram (
280
+ chunk_histogram : & mut Option < LevelHistogram > ,
281
+ page_histogram : & Option < LevelHistogram > ,
282
+ ) {
283
+ if let ( Some ( page_hist) , Some ( chunk_hist) ) = ( page_histogram, chunk_histogram) {
284
+ chunk_hist. add ( page_hist) ;
285
+ }
286
+ }
287
+
288
+ /// Sum the provided PageMetrics histograms into the chunk histograms. Does nothing if
289
+ /// page histograms are not initialized.
290
+ fn update_from_page_metrics ( & mut self , page_metrics : & PageMetrics ) {
291
+ ColumnMetrics :: < T > :: update_histogram (
292
+ & mut self . definition_level_histogram ,
293
+ & page_metrics. definition_level_histogram ,
294
+ ) ;
295
+ ColumnMetrics :: < T > :: update_histogram (
296
+ & mut self . repetition_level_histogram ,
297
+ & page_metrics. repetition_level_histogram ,
298
+ ) ;
299
+ }
300
+
216
301
/// Sum the provided page variable_length_bytes into the chunk variable_length_bytes
217
302
fn update_variable_length_bytes ( & mut self , variable_length_bytes : Option < i64 > ) {
218
303
if let Some ( var_bytes) = variable_length_bytes {
@@ -275,6 +360,19 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
275
360
// Used for level information
276
361
encodings. insert ( Encoding :: RLE ) ;
277
362
363
+ let mut page_metrics = PageMetrics :: new ( ) ;
364
+ let mut column_metrics = ColumnMetrics :: < E :: T > :: new ( ) ;
365
+
366
+ // Initialize level histograms if collecting page or chunk statistics
367
+ if statistics_enabled != EnabledStatistics :: None {
368
+ page_metrics = page_metrics
369
+ . with_repetition_level_histogram ( descr. max_rep_level ( ) )
370
+ . with_definition_level_histogram ( descr. max_def_level ( ) ) ;
371
+ column_metrics = column_metrics
372
+ . with_repetition_level_histogram ( descr. max_rep_level ( ) )
373
+ . with_definition_level_histogram ( descr. max_def_level ( ) )
374
+ }
375
+
278
376
// Disable column_index_builder if not collecting page statistics.
279
377
let mut column_index_builder = ColumnIndexBuilder :: new ( ) ;
280
378
if statistics_enabled != EnabledStatistics :: Page {
@@ -292,12 +390,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
292
390
def_levels_sink : vec ! [ ] ,
293
391
rep_levels_sink : vec ! [ ] ,
294
392
data_pages : VecDeque :: new ( ) ,
295
- page_metrics : PageMetrics {
296
- num_buffered_values : 0 ,
297
- num_buffered_rows : 0 ,
298
- num_page_nulls : 0 ,
299
- } ,
300
- column_metrics : ColumnMetrics :: < E :: T > :: new ( ) ,
393
+ page_metrics,
394
+ column_metrics,
301
395
column_index_builder,
302
396
offset_index_builder : OffsetIndexBuilder :: new ( ) ,
303
397
encodings,
@@ -547,6 +641,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
547
641
}
548
642
}
549
643
644
+ // Update histogram
645
+ self . page_metrics . update_definition_level_histogram ( levels) ;
646
+
550
647
self . def_levels_sink . extend_from_slice ( levels) ;
551
648
values_to_write
552
649
} else {
@@ -575,6 +672,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
575
672
self . page_metrics . num_buffered_rows += ( level == 0 ) as u32
576
673
}
577
674
675
+ // Update histogram
676
+ self . page_metrics . update_repetition_level_histogram ( levels) ;
677
+
578
678
self . rep_levels_sink . extend_from_slice ( levels) ;
579
679
} else {
580
680
// Each value is exactly one row.
@@ -718,7 +818,14 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
718
818
}
719
819
}
720
820
}
721
- // update the offset index
821
+
822
+ // Append page histograms to the `ColumnIndex` histograms
823
+ self . column_index_builder . append_histograms (
824
+ & self . page_metrics . repetition_level_histogram ,
825
+ & self . page_metrics . definition_level_histogram ,
826
+ ) ;
827
+
828
+ // Update the offset index
722
829
self . offset_index_builder
723
830
. append_row_count ( self . page_metrics . num_buffered_rows as i64 ) ;
724
831
@@ -804,7 +911,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
804
911
values_data. variable_length_bytes ,
805
912
) ;
806
913
807
- // Update variable_length_bytes in column_metrics
914
+ // Update histograms and variable_length_bytes in column_metrics
915
+ self . column_metrics
916
+ . update_from_page_metrics ( & self . page_metrics ) ;
808
917
self . column_metrics
809
918
. update_variable_length_bytes ( values_data. variable_length_bytes ) ;
810
919
@@ -911,7 +1020,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
911
1020
// Reset state.
912
1021
self . rep_levels_sink . clear ( ) ;
913
1022
self . def_levels_sink . clear ( ) ;
914
- self . page_metrics = PageMetrics :: default ( ) ;
1023
+ self . page_metrics . new_page ( ) ;
915
1024
916
1025
Ok ( ( ) )
917
1026
}
@@ -1019,7 +1128,13 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
1019
1128
1020
1129
builder = builder
1021
1130
. set_statistics ( statistics)
1022
- . set_unencoded_byte_array_data_bytes ( self . column_metrics . variable_length_bytes ) ;
1131
+ . set_unencoded_byte_array_data_bytes ( self . column_metrics . variable_length_bytes )
1132
+ . set_repetition_level_histogram (
1133
+ self . column_metrics . repetition_level_histogram . take ( ) ,
1134
+ )
1135
+ . set_definition_level_histogram (
1136
+ self . column_metrics . definition_level_histogram . take ( ) ,
1137
+ ) ;
1023
1138
}
1024
1139
1025
1140
let metadata = builder. build ( ) ?;
0 commit comments