@@ -1857,4 +1857,92 @@ mod tests {
1857
1857
assert_eq ! ( total_rows, expected) ;
1858
1858
}
1859
1859
}
1860
+
1861
+ #[ tokio:: test]
1862
+ async fn test_row_filter_nested ( ) {
1863
+ let a = StringArray :: from_iter_values ( [ "a" , "b" , "b" , "b" , "c" , "c" ] ) ;
1864
+ let b = StructArray :: from ( vec ! [
1865
+ (
1866
+ Arc :: new( Field :: new( "aa" , DataType :: Utf8 , true ) ) ,
1867
+ Arc :: new( StringArray :: from( vec![ "a" , "b" , "b" , "b" , "c" , "c" ] ) ) as ArrayRef ,
1868
+ ) ,
1869
+ (
1870
+ Arc :: new( Field :: new( "bb" , DataType :: Utf8 , true ) ) ,
1871
+ Arc :: new( StringArray :: from( vec![ "1" , "2" , "3" , "4" , "5" , "6" ] ) ) as ArrayRef ,
1872
+ ) ,
1873
+ ] ) ;
1874
+ let c = Int32Array :: from_iter ( 0 ..6 ) ;
1875
+ let data = RecordBatch :: try_from_iter ( [
1876
+ ( "a" , Arc :: new ( a) as ArrayRef ) ,
1877
+ ( "b" , Arc :: new ( b) as ArrayRef ) ,
1878
+ ( "c" , Arc :: new ( c) as ArrayRef ) ,
1879
+ ] )
1880
+ . unwrap ( ) ;
1881
+
1882
+ let mut buf = Vec :: with_capacity ( 1024 ) ;
1883
+ let mut writer = ArrowWriter :: try_new ( & mut buf, data. schema ( ) , None ) . unwrap ( ) ;
1884
+ writer. write ( & data) . unwrap ( ) ;
1885
+ writer. close ( ) . unwrap ( ) ;
1886
+
1887
+ let data: Bytes = buf. into ( ) ;
1888
+ let metadata = parse_metadata ( & data) . unwrap ( ) ;
1889
+ let parquet_schema = metadata. file_metadata ( ) . schema_descr_ptr ( ) ;
1890
+
1891
+ let test = TestReader {
1892
+ data,
1893
+ metadata : Arc :: new ( metadata) ,
1894
+ requests : Default :: default ( ) ,
1895
+ } ;
1896
+ let requests = test. requests . clone ( ) ;
1897
+
1898
+ let a_scalar = StringArray :: from_iter_values ( [ "b" ] ) ;
1899
+ let a_filter = ArrowPredicateFn :: new (
1900
+ ProjectionMask :: leaves ( & parquet_schema, vec ! [ 0 ] ) ,
1901
+ move |batch| eq ( batch. column ( 0 ) , & Scalar :: new ( & a_scalar) ) ,
1902
+ ) ;
1903
+
1904
+ let b_scalar = StringArray :: from_iter_values ( [ "4" ] ) ;
1905
+ let b_filter = ArrowPredicateFn :: new (
1906
+ ProjectionMask :: leaves ( & parquet_schema, vec ! [ 2 ] ) ,
1907
+ move |batch| {
1908
+ // Filter on the second element of the struct.
1909
+ let struct_array = batch
1910
+ . column ( 0 )
1911
+ . as_any ( )
1912
+ . downcast_ref :: < StructArray > ( )
1913
+ . unwrap ( ) ;
1914
+ eq ( struct_array. column ( 0 ) , & Scalar :: new ( & b_scalar) )
1915
+ } ,
1916
+ ) ;
1917
+
1918
+ let filter = RowFilter :: new ( vec ! [ Box :: new( a_filter) , Box :: new( b_filter) ] ) ;
1919
+
1920
+ let mask = ProjectionMask :: leaves ( & parquet_schema, vec ! [ 0 , 3 ] ) ;
1921
+ let stream = ParquetRecordBatchStreamBuilder :: new ( test)
1922
+ . await
1923
+ . unwrap ( )
1924
+ . with_projection ( mask. clone ( ) )
1925
+ . with_batch_size ( 1024 )
1926
+ . with_row_filter ( filter)
1927
+ . build ( )
1928
+ . unwrap ( ) ;
1929
+
1930
+ let batches: Vec < _ > = stream. try_collect ( ) . await . unwrap ( ) ;
1931
+ assert_eq ! ( batches. len( ) , 1 ) ;
1932
+
1933
+ let batch = & batches[ 0 ] ;
1934
+ assert_eq ! ( batch. num_rows( ) , 1 ) ;
1935
+ assert_eq ! ( batch. num_columns( ) , 2 ) ;
1936
+
1937
+ let col = batch. column ( 0 ) ;
1938
+ let val = col. as_any ( ) . downcast_ref :: < StringArray > ( ) . unwrap ( ) . value ( 0 ) ;
1939
+ assert_eq ! ( val, "b" ) ;
1940
+
1941
+ let col = batch. column ( 1 ) ;
1942
+ let val = col. as_any ( ) . downcast_ref :: < Int32Array > ( ) . unwrap ( ) . value ( 0 ) ;
1943
+ assert_eq ! ( val, 3 ) ;
1944
+
1945
+ // Should only have made 3 requests
1946
+ assert_eq ! ( requests. lock( ) . unwrap( ) . len( ) , 3 ) ;
1947
+ }
1860
1948
}
0 commit comments