@@ -378,298 +378,3 @@ impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
378
378
}
379
379
}
380
380
}
381
-
382
- #[ cfg( test) ]
383
- #[ cfg( feature = "arrow" ) ]
384
- #[ cfg( feature = "async" ) ]
385
- mod tests {
386
- use std:: sync:: Arc ;
387
-
388
- use crate :: file:: metadata:: {
389
- ColumnChunkMetaData , ParquetMetaData , ParquetMetaDataReader , ParquetMetaDataWriter ,
390
- RowGroupMetaData ,
391
- } ;
392
- use crate :: file:: properties:: { EnabledStatistics , WriterProperties } ;
393
- use crate :: file:: reader:: { FileReader , SerializedFileReader } ;
394
- use crate :: {
395
- arrow:: ArrowWriter ,
396
- file:: { page_index:: index:: Index , serialized_reader:: ReadOptionsBuilder } ,
397
- } ;
398
- use arrow_array:: { ArrayRef , Int32Array , RecordBatch } ;
399
- use arrow_schema:: { DataType as ArrowDataType , Field , Schema } ;
400
- use bytes:: { BufMut , Bytes , BytesMut } ;
401
-
402
- struct TestMetadata {
403
- #[ allow( dead_code) ]
404
- file_size : usize ,
405
- metadata : ParquetMetaData ,
406
- }
407
-
408
- fn has_page_index ( metadata : & ParquetMetaData ) -> bool {
409
- match metadata. column_index ( ) {
410
- Some ( column_index) => column_index
411
- . iter ( )
412
- . any ( |rg_idx| rg_idx. iter ( ) . all ( |col_idx| !matches ! ( col_idx, Index :: NONE ) ) ) ,
413
- None => false ,
414
- }
415
- }
416
-
417
- #[ test]
418
- fn test_roundtrip_parquet_metadata_without_page_index ( ) {
419
- // We currently don't have an ad-hoc ParquetMetadata loader that can load page indexes so
420
- // we at least test round trip without them
421
- let metadata = get_test_metadata ( false , false ) ;
422
- assert ! ( !has_page_index( & metadata. metadata) ) ;
423
-
424
- let mut buf = BytesMut :: new ( ) . writer ( ) ;
425
- {
426
- let writer = ParquetMetaDataWriter :: new ( & mut buf, & metadata. metadata ) ;
427
- writer. finish ( ) . unwrap ( ) ;
428
- }
429
-
430
- let data = buf. into_inner ( ) . freeze ( ) ;
431
-
432
- let decoded_metadata = ParquetMetaDataReader :: new ( )
433
- . parse_and_finish ( & data)
434
- . unwrap ( ) ;
435
- assert ! ( !has_page_index( & metadata. metadata) ) ;
436
-
437
- assert_eq ! ( metadata. metadata, decoded_metadata) ;
438
- }
439
-
440
- fn get_test_metadata ( write_page_index : bool , read_page_index : bool ) -> TestMetadata {
441
- let mut buf = BytesMut :: new ( ) . writer ( ) ;
442
- let schema: Arc < Schema > = Arc :: new ( Schema :: new ( vec ! [ Field :: new(
443
- "a" ,
444
- ArrowDataType :: Int32 ,
445
- true ,
446
- ) ] ) ) ;
447
-
448
- // build row groups / pages that exercise different combinations of nulls and values
449
- // note that below we set the row group and page sizes to 4 and 2 respectively
450
- // so that these "groupings" make sense
451
- let a: ArrayRef = Arc :: new ( Int32Array :: from ( vec ! [
452
- // a row group that has all values
453
- Some ( i32 :: MIN ) ,
454
- Some ( -1 ) ,
455
- Some ( 1 ) ,
456
- Some ( i32 :: MAX ) ,
457
- // a row group with a page of all nulls and a page of all values
458
- None ,
459
- None ,
460
- Some ( 2 ) ,
461
- Some ( 3 ) ,
462
- // a row group that has all null pages
463
- None ,
464
- None ,
465
- None ,
466
- None ,
467
- // a row group having 1 page with all values and 1 page with some nulls
468
- Some ( 4 ) ,
469
- Some ( 5 ) ,
470
- None ,
471
- Some ( 6 ) ,
472
- // a row group having 1 page with all nulls and 1 page with some nulls
473
- None ,
474
- None ,
475
- Some ( 7 ) ,
476
- None ,
477
- // a row group having all pages with some nulls
478
- None ,
479
- Some ( 8 ) ,
480
- Some ( 9 ) ,
481
- None ,
482
- ] ) ) ;
483
-
484
- let batch = RecordBatch :: try_from_iter ( vec ! [ ( "a" , a) ] ) . unwrap ( ) ;
485
-
486
- let writer_props_builder = match write_page_index {
487
- true => WriterProperties :: builder ( ) . set_statistics_enabled ( EnabledStatistics :: Page ) ,
488
- false => WriterProperties :: builder ( ) . set_statistics_enabled ( EnabledStatistics :: Chunk ) ,
489
- } ;
490
-
491
- // tune the size or pages to the data above
492
- // to make sure we exercise code paths where all items in a page are null, etc.
493
- let writer_props = writer_props_builder
494
- . set_max_row_group_size ( 4 )
495
- . set_data_page_row_count_limit ( 2 )
496
- . set_write_batch_size ( 2 )
497
- . build ( ) ;
498
-
499
- let mut writer = ArrowWriter :: try_new ( & mut buf, schema, Some ( writer_props) ) . unwrap ( ) ;
500
- writer. write ( & batch) . unwrap ( ) ;
501
- writer. close ( ) . unwrap ( ) ;
502
-
503
- let data = buf. into_inner ( ) . freeze ( ) ;
504
-
505
- let reader_opts = match read_page_index {
506
- true => ReadOptionsBuilder :: new ( ) . with_page_index ( ) . build ( ) ,
507
- false => ReadOptionsBuilder :: new ( ) . build ( ) ,
508
- } ;
509
- let reader = SerializedFileReader :: new_with_options ( data. clone ( ) , reader_opts) . unwrap ( ) ;
510
- let metadata = reader. metadata ( ) . clone ( ) ;
511
- TestMetadata {
512
- file_size : data. len ( ) ,
513
- metadata,
514
- }
515
- }
516
-
517
- /// Temporary function so we can test loading metadata with page indexes
518
- /// while we haven't fully figured out how to load it cleanly
519
- async fn load_metadata_from_bytes ( file_size : usize , data : Bytes ) -> ParquetMetaData {
520
- use crate :: arrow:: async_reader:: MetadataFetch ;
521
- use crate :: errors:: Result as ParquetResult ;
522
- use futures:: future:: BoxFuture ;
523
- use futures:: FutureExt ;
524
- use std:: ops:: Range ;
525
-
526
- /// Adapt a `Bytes` to a `MetadataFetch` implementation.
527
- struct AsyncBytes {
528
- data : Bytes ,
529
- }
530
-
531
- impl AsyncBytes {
532
- fn new ( data : Bytes ) -> Self {
533
- Self { data }
534
- }
535
- }
536
-
537
- impl MetadataFetch for AsyncBytes {
538
- fn fetch ( & mut self , range : Range < usize > ) -> BoxFuture < ' _ , ParquetResult < Bytes > > {
539
- async move { Ok ( self . data . slice ( range. start ..range. end ) ) } . boxed ( )
540
- }
541
- }
542
-
543
- /// A `MetadataFetch` implementation that reads from a subset of the full data
544
- /// while accepting ranges that address the full data.
545
- struct MaskedBytes {
546
- inner : Box < dyn MetadataFetch + Send > ,
547
- inner_range : Range < usize > ,
548
- }
549
-
550
- impl MaskedBytes {
551
- fn new ( inner : Box < dyn MetadataFetch + Send > , inner_range : Range < usize > ) -> Self {
552
- Self { inner, inner_range }
553
- }
554
- }
555
-
556
- impl MetadataFetch for & mut MaskedBytes {
557
- fn fetch ( & mut self , range : Range < usize > ) -> BoxFuture < ' _ , ParquetResult < Bytes > > {
558
- let inner_range = self . inner_range . clone ( ) ;
559
- println ! ( "inner_range: {:?}" , inner_range) ;
560
- println ! ( "range: {:?}" , range) ;
561
- assert ! ( inner_range. start <= range. start && inner_range. end >= range. end) ;
562
- let range =
563
- range. start - self . inner_range . start ..range. end - self . inner_range . start ;
564
- self . inner . fetch ( range)
565
- }
566
- }
567
-
568
- let metadata_length = data. len ( ) ;
569
- let mut reader = MaskedBytes :: new (
570
- Box :: new ( AsyncBytes :: new ( data) ) ,
571
- file_size - metadata_length..file_size,
572
- ) ;
573
- ParquetMetaDataReader :: new ( )
574
- . with_page_indexes ( true )
575
- . load_and_finish ( & mut reader, file_size)
576
- . await
577
- . unwrap ( )
578
- }
579
-
580
- fn check_columns_are_equivalent ( left : & ColumnChunkMetaData , right : & ColumnChunkMetaData ) {
581
- assert_eq ! ( left. column_descr( ) , right. column_descr( ) ) ;
582
- assert_eq ! ( left. encodings( ) , right. encodings( ) ) ;
583
- assert_eq ! ( left. num_values( ) , right. num_values( ) ) ;
584
- assert_eq ! ( left. compressed_size( ) , right. compressed_size( ) ) ;
585
- assert_eq ! ( left. data_page_offset( ) , right. data_page_offset( ) ) ;
586
- assert_eq ! ( left. statistics( ) , right. statistics( ) ) ;
587
- assert_eq ! ( left. offset_index_length( ) , right. offset_index_length( ) ) ;
588
- assert_eq ! ( left. column_index_length( ) , right. column_index_length( ) ) ;
589
- assert_eq ! (
590
- left. unencoded_byte_array_data_bytes( ) ,
591
- right. unencoded_byte_array_data_bytes( )
592
- ) ;
593
- }
594
-
595
- fn check_row_groups_are_equivalent ( left : & RowGroupMetaData , right : & RowGroupMetaData ) {
596
- assert_eq ! ( left. num_rows( ) , right. num_rows( ) ) ;
597
- assert_eq ! ( left. file_offset( ) , right. file_offset( ) ) ;
598
- assert_eq ! ( left. total_byte_size( ) , right. total_byte_size( ) ) ;
599
- assert_eq ! ( left. schema_descr( ) , right. schema_descr( ) ) ;
600
- assert_eq ! ( left. num_columns( ) , right. num_columns( ) ) ;
601
- left. columns ( )
602
- . iter ( )
603
- . zip ( right. columns ( ) . iter ( ) )
604
- . for_each ( |( lc, rc) | {
605
- check_columns_are_equivalent ( lc, rc) ;
606
- } ) ;
607
- }
608
-
609
- #[ tokio:: test]
610
- async fn test_encode_parquet_metadata_with_page_index ( ) {
611
- // Create a ParquetMetadata with page index information
612
- let metadata = get_test_metadata ( true , true ) ;
613
- assert ! ( has_page_index( & metadata. metadata) ) ;
614
-
615
- let mut buf = BytesMut :: new ( ) . writer ( ) ;
616
- {
617
- let writer = ParquetMetaDataWriter :: new ( & mut buf, & metadata. metadata ) ;
618
- writer. finish ( ) . unwrap ( ) ;
619
- }
620
-
621
- let data = buf. into_inner ( ) . freeze ( ) ;
622
-
623
- let decoded_metadata = load_metadata_from_bytes ( data. len ( ) , data) . await ;
624
-
625
- // Because the page index offsets will differ, compare invariant parts of the metadata
626
- assert_eq ! (
627
- metadata. metadata. file_metadata( ) ,
628
- decoded_metadata. file_metadata( )
629
- ) ;
630
- assert_eq ! (
631
- metadata. metadata. column_index( ) ,
632
- decoded_metadata. column_index( )
633
- ) ;
634
- assert_eq ! (
635
- metadata. metadata. offset_index( ) ,
636
- decoded_metadata. offset_index( )
637
- ) ;
638
- assert_eq ! (
639
- metadata. metadata. num_row_groups( ) ,
640
- decoded_metadata. num_row_groups( )
641
- ) ;
642
-
643
- // check that the mins and maxes are what we expect for each page
644
- // also indirectly checking that the pages were written out as we expected them to be laid out
645
- // (if they're not, or something gets refactored in the future that breaks that assumption,
646
- // this test may have to drop down to a lower level and create metadata directly instead of relying on
647
- // writing an entire file)
648
- let column_indexes = metadata. metadata . column_index ( ) . unwrap ( ) ;
649
- assert_eq ! ( column_indexes. len( ) , 6 ) ;
650
- // make sure each row group has 2 pages by checking the first column
651
- // page counts for each column for each row group, should all be the same and there should be
652
- // 12 pages in total across 6 row groups / 1 column
653
- let mut page_counts = vec ! [ ] ;
654
- for row_group in column_indexes {
655
- for column in row_group {
656
- match column {
657
- Index :: INT32 ( column_index) => {
658
- page_counts. push ( column_index. indexes . len ( ) ) ;
659
- }
660
- _ => panic ! ( "unexpected column index type" ) ,
661
- }
662
- }
663
- }
664
- assert_eq ! ( page_counts, vec![ 2 ; 6 ] ) ;
665
-
666
- metadata
667
- . metadata
668
- . row_groups ( )
669
- . iter ( )
670
- . zip ( decoded_metadata. row_groups ( ) . iter ( ) )
671
- . for_each ( |( left, right) | {
672
- check_row_groups_are_equivalent ( left, right) ;
673
- } ) ;
674
- }
675
- }
0 commit comments