From f81d6d9bba254c71d829ab978a534755093e5a8a Mon Sep 17 00:00:00 2001 From: Robin Bernon Date: Thu, 11 Mar 2021 12:38:01 +0000 Subject: [PATCH 01/11] Adding row_predicate field to Parquet struct. --- amadeus-derive/src/lib.rs | 1 + amadeus-parquet/src/internal/record/mod.rs | 2 +- amadeus-parquet/src/lib.rs | 6 +- benches/parquet.rs | 4 +- tests/parquet.rs | 98 +++++++++++----------- tests/parquet_dist.rs | 92 ++++++++++---------- 6 files changed, 104 insertions(+), 99 deletions(-) diff --git a/amadeus-derive/src/lib.rs b/amadeus-derive/src/lib.rs index 69f14a29..91ae2389 100644 --- a/amadeus-derive/src/lib.rs +++ b/amadeus-derive/src/lib.rs @@ -300,6 +300,7 @@ fn impl_struct( #visibility struct #reader_name #impl_generics #where_clause_with_parquet_data { #(#field_names1: <#field_types1 as __::ParquetData>::Reader,)* } + #[derive(Clone, Debug)] #visibility struct #predicate_name #impl_generics #where_clause_with_parquet_data { #(#field_names1: __::Option<<#field_types1 as __::ParquetData>::Predicate>,)* } diff --git a/amadeus-parquet/src/internal/record/mod.rs b/amadeus-parquet/src/internal/record/mod.rs index 9178ca7c..a12b38e5 100644 --- a/amadeus-parquet/src/internal/record/mod.rs +++ b/amadeus-parquet/src/internal/record/mod.rs @@ -154,7 +154,7 @@ pub trait ParquetData: Data + Sized { // Clone + PartialEq + Debug + 'static type Schema: Schema; type Reader: Reader; - type Predicate; + type Predicate: Clone + Debug; /// Parse a [`Type`] into `Self::Schema`, using `repetition` instead of /// `Type::get_basic_info().repetition()`. A `repetition` of `None` denotes a root diff --git a/amadeus-parquet/src/lib.rs b/amadeus-parquet/src/lib.rs index 3a501bdb..e283218c 100644 --- a/amadeus-parquet/src/lib.rs +++ b/amadeus-parquet/src/lib.rs @@ -79,6 +79,7 @@ mod wrap { Row: ParquetData, { partitions: Vec, + row_predicate: Option, marker: PhantomData Row>, } impl Parquet @@ -86,9 +87,10 @@ mod wrap { F: File, Row: ParquetData + 'static, { - pub async fn new(file: F) -> Result::Error> { + pub async fn new(file: F, row_predicate: Option) -> Result::Error> { Ok(Self { partitions: file.partitions().await.map_err(ParquetError::File)?, + row_predicate, marker: PhantomData, }) } @@ -115,6 +117,8 @@ mod wrap { } #[allow(clippy::let_and_return)] fn dist_stream(self) -> Self::DistStream { + let predicate = self.row_predicate.clone(); + self.partitions .into_dist_stream() .flat_map(FnMut!(|partition: F::Partition| async move { diff --git a/benches/parquet.rs b/benches/parquet.rs index 57d325d9..2562c15d 100644 --- a/benches/parquet.rs +++ b/benches/parquet.rs @@ -63,7 +63,7 @@ struct StockSimulated { fn parquet_10k(b: &mut Bencher) { let file = "amadeus-testing/parquet/10k-v2.parquet"; // 669,034 bytes run(b, file, || async { - let rows = Parquet::<_, TenKayVeeTwo>::new(PathBuf::from(file)) + let rows = Parquet::<_, TenKayVeeTwo>::new(PathBuf::from(file), None) .await .unwrap(); assert_eq!( @@ -80,7 +80,7 @@ fn parquet_10k(b: &mut Bencher) { fn parquet_stock(b: &mut Bencher) { let file = "amadeus-testing/parquet/stock_simulated.parquet"; // 1,289,419 bytes run(b, file, || async { - let rows = Parquet::<_, StockSimulated>::new(PathBuf::from(file)) + let rows = Parquet::<_, StockSimulated>::new(PathBuf::from(file), None) .await .unwrap(); assert_eq!( diff --git a/tests/parquet.rs b/tests/parquet.rs index dab7099a..0259def0 100644 --- a/tests/parquet.rs +++ b/tests/parquet.rs @@ -25,7 +25,7 @@ async fn parquet() { PathBuf::from("amadeus-testing/parquet/cf-accesslogs/year=2018/month=11/day=05/part-00025-96c249f4-3a10-4509-b6b8-693a5d90dbf3.c000.snappy.parquet"), PathBuf::from("amadeus-testing/parquet/cf-accesslogs/year=2018/month=11/day=06/part-00185-96c249f4-3a10-4509-b6b8-693a5d90dbf3.c000.snappy.parquet"), PathBuf::from("amadeus-testing/parquet/cf-accesslogs/year=2018/month=11/day=07/part-00151-96c249f4-3a10-4509-b6b8-693a5d90dbf3.c000.snappy.parquet"), - ]).await.unwrap(); + ], None).await.unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) @@ -36,7 +36,7 @@ async fn parquet() { let rows = Parquet::<_, Value>::new(ParquetDirectory::new(PathBuf::from( "amadeus-testing/parquet/cf-accesslogs/", - ))) + )), None) .await .unwrap(); assert_eq!( @@ -47,9 +47,9 @@ async fn parquet() { 207_535 ); - #[cfg(feature = "aws")] + #[cfg(feature = "aws")] { - let rows = Parquet::<_, Value>::new(vec![S3File::new_with(AwsRegion::UsEast1, "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/year=2018/month=11/day=03/part-00137-17868f39-cd99-4b60-bb48-8daf9072122e.c000.snappy.parquet", AwsCredentials::Anonymous);20]).await.unwrap(); + let rows = Parquet::<_, Value>::new(vec![S3File::new_with(AwsRegion::UsEast1, "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/year=2018/month=11/day=03/part-00137-17868f39-cd99-4b60-bb48-8daf9072122e.c000.snappy.parquet", AwsCredentials::Anonymous);20], None).await.unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) @@ -63,7 +63,7 @@ async fn parquet() { "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/", AwsCredentials::Anonymous, - ))) + )), None) .await .unwrap(); assert_eq!( @@ -83,7 +83,7 @@ async fn parquet() { S3File::new_with(AwsRegion::UsEast1, "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/year=2018/month=11/day=05/part-00025-96c249f4-3a10-4509-b6b8-693a5d90dbf3.c000.snappy.parquet", AwsCredentials::Anonymous), S3File::new_with(AwsRegion::UsEast1, "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/year=2018/month=11/day=06/part-00185-96c249f4-3a10-4509-b6b8-693a5d90dbf3.c000.snappy.parquet", AwsCredentials::Anonymous), S3File::new_with(AwsRegion::UsEast1, "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/year=2018/month=11/day=07/part-00151-96c249f4-3a10-4509-b6b8-693a5d90dbf3.c000.snappy.parquet", AwsCredentials::Anonymous), - ]).await.unwrap(); + ], None).await.unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) @@ -97,7 +97,7 @@ async fn parquet() { "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/", AwsCredentials::Anonymous, - ))) + )), None) .await .unwrap(); assert_eq!( @@ -136,7 +136,7 @@ async fn parquet() { } let rows = Parquet::<_, StockSimulatedDerived>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -149,7 +149,7 @@ async fn parquet() { let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -164,7 +164,7 @@ async fn parquet() { 42_000 ); - #[derive(Data, Clone, PartialEq, Debug)] + #[derive(Data, Clone, PartialEq, Debug)] struct StockSimulatedDerivedProjection1 { bs5: Option, __index_level_0__: Option, @@ -172,7 +172,7 @@ async fn parquet() { let rows = Parquet::<_, StockSimulatedDerivedProjection1>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -185,7 +185,7 @@ async fn parquet() { let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -205,7 +205,7 @@ async fn parquet() { let rows = Parquet::<_, StockSimulatedDerivedProjection2>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -218,7 +218,7 @@ async fn parquet() { let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -257,7 +257,7 @@ async fn parquet() { } let rows = - Parquet::<_, TenKayVeeTwo>::new(PathBuf::from("amadeus-testing/parquet/10k-v2.parquet")) + Parquet::<_, TenKayVeeTwo>::new(PathBuf::from("amadeus-testing/parquet/10k-v2.parquet"), None) .await .unwrap(); assert_eq!( @@ -270,7 +270,7 @@ async fn parquet() { let rows = Parquet::<_, TenKayVeeTwoDerived>::new(PathBuf::from( "amadeus-testing/parquet/10k-v2.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -281,7 +281,7 @@ async fn parquet() { 10_000 ); - let rows = Parquet::<_, Value>::new(PathBuf::from("amadeus-testing/parquet/10k-v2.parquet")) + let rows = Parquet::<_, Value>::new(PathBuf::from("amadeus-testing/parquet/10k-v2.parquet"), None) .await .unwrap(); assert_eq!( @@ -328,7 +328,7 @@ async fn parquet() { let rows = Parquet::<_, AlltypesDictionary>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_dictionary.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -341,7 +341,7 @@ async fn parquet() { let rows = Parquet::<_, AlltypesDictionaryDerived>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_dictionary.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -354,7 +354,7 @@ async fn parquet() { let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_dictionary.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -401,7 +401,7 @@ async fn parquet() { let rows = Parquet::<_, AlltypesPlain>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -414,7 +414,7 @@ async fn parquet() { let rows = Parquet::<_, AlltypesPlainDerived>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -427,7 +427,7 @@ async fn parquet() { let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -474,7 +474,7 @@ async fn parquet() { let rows = Parquet::<_, AlltypesPlainSnappy>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.snappy.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -487,7 +487,7 @@ async fn parquet() { let rows = Parquet::<_, AlltypesPlainSnappyDerived>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.snappy.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -500,7 +500,7 @@ async fn parquet() { let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.snappy.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -555,7 +555,7 @@ async fn parquet() { } let rows = Parquet::<_, NestedLists>::new(PathBuf::from( "amadeus-testing/parquet/nested_lists.snappy.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -568,7 +568,7 @@ async fn parquet() { let rows = Parquet::<_, NestedListsDerived>::new(PathBuf::from( "amadeus-testing/parquet/nested_lists.snappy.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -581,7 +581,7 @@ async fn parquet() { let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nested_lists.snappy.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -610,7 +610,7 @@ async fn parquet() { } let rows = Parquet::<_, NestedMaps>::new(PathBuf::from( "amadeus-testing/parquet/nested_maps.snappy.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -623,7 +623,7 @@ async fn parquet() { let rows = Parquet::<_, NestedMapsDerived>::new(PathBuf::from( "amadeus-testing/parquet/nested_maps.snappy.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -636,7 +636,7 @@ async fn parquet() { let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nested_maps.snappy.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -704,7 +704,7 @@ async fn parquet() { let rows = Parquet::<_, Nonnullable>::new(PathBuf::from( "amadeus-testing/parquet/nonnullable.impala.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -717,7 +717,7 @@ async fn parquet() { let rows = Parquet::<_, NonnullableDerived>::new(PathBuf::from( "amadeus-testing/parquet/nonnullable.impala.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -730,7 +730,7 @@ async fn parquet() { let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nonnullable.impala.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -777,7 +777,7 @@ async fn parquet() { } let rows = Parquet::<_, Nullable>::new(PathBuf::from( "amadeus-testing/parquet/nullable.impala.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -790,7 +790,7 @@ async fn parquet() { let rows = Parquet::<_, NullableDerived>::new(PathBuf::from( "amadeus-testing/parquet/nullable.impala.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -803,7 +803,7 @@ async fn parquet() { let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nullable.impala.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -826,7 +826,7 @@ async fn parquet() { } let rows = Parquet::<_, Nulls>::new(PathBuf::from( "amadeus-testing/parquet/nulls.snappy.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -839,7 +839,7 @@ async fn parquet() { let rows = Parquet::<_, NullsDerived>::new(PathBuf::from( "amadeus-testing/parquet/nulls.snappy.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -852,7 +852,7 @@ async fn parquet() { let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nulls.snappy.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -877,7 +877,7 @@ async fn parquet() { } let rows = Parquet::<_, Repeated>::new(PathBuf::from( "amadeus-testing/parquet/repeated_no_annotation.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -890,7 +890,7 @@ async fn parquet() { let rows = Parquet::<_, RepeatedDerived>::new(PathBuf::from( "amadeus-testing/parquet/repeated_no_annotation.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -903,7 +903,7 @@ async fn parquet() { let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/repeated_no_annotation.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -930,7 +930,7 @@ async fn parquet() { } let rows = Parquet::<_, TestDatapage>::new(PathBuf::from( "amadeus-testing/parquet/datapage_v2.snappy.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -943,7 +943,7 @@ async fn parquet() { let rows = Parquet::<_, TestDatapageDerived>::new(PathBuf::from( "amadeus-testing/parquet/datapage_v2.snappy.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -956,7 +956,7 @@ async fn parquet() { let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/datapage_v2.snappy.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -1000,7 +1000,7 @@ async fn parquet() { } let rows = - Parquet::<_, CommitsDerived>::new(PathBuf::from("amadeus-testing/parquet/commits.parquet")) + Parquet::<_, CommitsDerived>::new(PathBuf::from("amadeus-testing/parquet/commits.parquet"), None) .await .unwrap(); assert_eq!( @@ -1011,7 +1011,7 @@ async fn parquet() { 14_444 ); - let rows = Parquet::<_, Value>::new(PathBuf::from("amadeus-testing/parquet/commits.parquet")) + let rows = Parquet::<_, Value>::new(PathBuf::from("amadeus-testing/parquet/commits.parquet"), None) .await .unwrap(); assert_eq!( diff --git a/tests/parquet_dist.rs b/tests/parquet_dist.rs index b8e006a5..7f847fd9 100644 --- a/tests/parquet_dist.rs +++ b/tests/parquet_dist.rs @@ -43,7 +43,7 @@ async fn run(pool: &P) -> Duration { let rows = Parquet::<_, Value>::new(ParquetDirectory::new(PathBuf::from( "amadeus-testing/parquet/cf-accesslogs/", - ))) + )), None) .await .unwrap(); assert_eq!( @@ -56,7 +56,7 @@ async fn run(pool: &P) -> Duration { #[cfg(feature = "aws")] { - let rows = Parquet::<_, Value>::new(vec![S3File::new_with(AwsRegion::UsEast1, "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/year=2018/month=11/day=03/part-00137-17868f39-cd99-4b60-bb48-8daf9072122e.c000.snappy.parquet", AwsCredentials::Anonymous);20]).await.unwrap(); + let rows = Parquet::<_, Value>::new(vec![S3File::new_with(AwsRegion::UsEast1, "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/year=2018/month=11/day=03/part-00137-17868f39-cd99-4b60-bb48-8daf9072122e.c000.snappy.parquet", AwsCredentials::Anonymous);20], None).await.unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result<_, _>| row.unwrap())) @@ -70,7 +70,7 @@ async fn run(pool: &P) -> Duration { "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/", AwsCredentials::Anonymous, - ))) + )), None) .await .unwrap(); assert_eq!( @@ -90,7 +90,7 @@ async fn run(pool: &P) -> Duration { S3File::new_with(AwsRegion::UsEast1, "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/year=2018/month=11/day=05/part-00025-96c249f4-3a10-4509-b6b8-693a5d90dbf3.c000.snappy.parquet", AwsCredentials::Anonymous), S3File::new_with(AwsRegion::UsEast1, "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/year=2018/month=11/day=06/part-00185-96c249f4-3a10-4509-b6b8-693a5d90dbf3.c000.snappy.parquet", AwsCredentials::Anonymous), S3File::new_with(AwsRegion::UsEast1, "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/year=2018/month=11/day=07/part-00151-96c249f4-3a10-4509-b6b8-693a5d90dbf3.c000.snappy.parquet", AwsCredentials::Anonymous), - ]).await.unwrap(); + ], None).await.unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result<_, _>| row.unwrap())) @@ -104,7 +104,7 @@ async fn run(pool: &P) -> Duration { "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/", AwsCredentials::Anonymous, - ))) + )), None) .await .unwrap(); assert_eq!( @@ -143,7 +143,7 @@ async fn run(pool: &P) -> Duration { } let rows = Parquet::<_, StockSimulatedDerived>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -156,7 +156,7 @@ async fn run(pool: &P) -> Duration { let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -179,7 +179,7 @@ async fn run(pool: &P) -> Duration { let rows = Parquet::<_, StockSimulatedDerivedProjection1>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -192,7 +192,7 @@ async fn run(pool: &P) -> Duration { let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -212,7 +212,7 @@ async fn run(pool: &P) -> Duration { let rows = Parquet::<_, StockSimulatedDerivedProjection2>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -225,7 +225,7 @@ async fn run(pool: &P) -> Duration { let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -264,7 +264,7 @@ async fn run(pool: &P) -> Duration { } let rows = - Parquet::<_, TenKayVeeTwo>::new(PathBuf::from("amadeus-testing/parquet/10k-v2.parquet")) + Parquet::<_, TenKayVeeTwo>::new(PathBuf::from("amadeus-testing/parquet/10k-v2.parquet"), None) .await .unwrap(); assert_eq!( @@ -277,7 +277,7 @@ async fn run(pool: &P) -> Duration { let rows = Parquet::<_, TenKayVeeTwoDerived>::new(PathBuf::from( "amadeus-testing/parquet/10k-v2.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -288,7 +288,7 @@ async fn run(pool: &P) -> Duration { 10_000 ); - let rows = Parquet::<_, Value>::new(PathBuf::from("amadeus-testing/parquet/10k-v2.parquet")) + let rows = Parquet::<_, Value>::new(PathBuf::from("amadeus-testing/parquet/10k-v2.parquet"), None) .await .unwrap(); assert_eq!( @@ -335,7 +335,7 @@ async fn run(pool: &P) -> Duration { let rows = Parquet::<_, AlltypesDictionary>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_dictionary.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -348,7 +348,7 @@ async fn run(pool: &P) -> Duration { let rows = Parquet::<_, AlltypesDictionaryDerived>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_dictionary.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -361,7 +361,7 @@ async fn run(pool: &P) -> Duration { let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_dictionary.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -408,7 +408,7 @@ async fn run(pool: &P) -> Duration { let rows = Parquet::<_, AlltypesPlain>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -421,7 +421,7 @@ async fn run(pool: &P) -> Duration { let rows = Parquet::<_, AlltypesPlainDerived>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -434,7 +434,7 @@ async fn run(pool: &P) -> Duration { let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -481,7 +481,7 @@ async fn run(pool: &P) -> Duration { let rows = Parquet::<_, AlltypesPlainSnappy>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.snappy.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -494,7 +494,7 @@ async fn run(pool: &P) -> Duration { let rows = Parquet::<_, AlltypesPlainSnappyDerived>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.snappy.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -507,7 +507,7 @@ async fn run(pool: &P) -> Duration { let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.snappy.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -562,7 +562,7 @@ async fn run(pool: &P) -> Duration { } let rows = Parquet::<_, NestedLists>::new(PathBuf::from( "amadeus-testing/parquet/nested_lists.snappy.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -575,7 +575,7 @@ async fn run(pool: &P) -> Duration { let rows = Parquet::<_, NestedListsDerived>::new(PathBuf::from( "amadeus-testing/parquet/nested_lists.snappy.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -588,7 +588,7 @@ async fn run(pool: &P) -> Duration { let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nested_lists.snappy.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -617,7 +617,7 @@ async fn run(pool: &P) -> Duration { } let rows = Parquet::<_, NestedMaps>::new(PathBuf::from( "amadeus-testing/parquet/nested_maps.snappy.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -630,7 +630,7 @@ async fn run(pool: &P) -> Duration { let rows = Parquet::<_, NestedMapsDerived>::new(PathBuf::from( "amadeus-testing/parquet/nested_maps.snappy.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -643,7 +643,7 @@ async fn run(pool: &P) -> Duration { let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nested_maps.snappy.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -711,7 +711,7 @@ async fn run(pool: &P) -> Duration { let rows = Parquet::<_, Nonnullable>::new(PathBuf::from( "amadeus-testing/parquet/nonnullable.impala.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -724,7 +724,7 @@ async fn run(pool: &P) -> Duration { let rows = Parquet::<_, NonnullableDerived>::new(PathBuf::from( "amadeus-testing/parquet/nonnullable.impala.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -737,7 +737,7 @@ async fn run(pool: &P) -> Duration { let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nonnullable.impala.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -784,7 +784,7 @@ async fn run(pool: &P) -> Duration { } let rows = Parquet::<_, Nullable>::new(PathBuf::from( "amadeus-testing/parquet/nullable.impala.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -797,7 +797,7 @@ async fn run(pool: &P) -> Duration { let rows = Parquet::<_, NullableDerived>::new(PathBuf::from( "amadeus-testing/parquet/nullable.impala.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -810,7 +810,7 @@ async fn run(pool: &P) -> Duration { let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nullable.impala.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -833,7 +833,7 @@ async fn run(pool: &P) -> Duration { } let rows = Parquet::<_, Nulls>::new(PathBuf::from( "amadeus-testing/parquet/nulls.snappy.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -846,7 +846,7 @@ async fn run(pool: &P) -> Duration { let rows = Parquet::<_, NullsDerived>::new(PathBuf::from( "amadeus-testing/parquet/nulls.snappy.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -859,7 +859,7 @@ async fn run(pool: &P) -> Duration { let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nulls.snappy.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -884,7 +884,7 @@ async fn run(pool: &P) -> Duration { } let rows = Parquet::<_, Repeated>::new(PathBuf::from( "amadeus-testing/parquet/repeated_no_annotation.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -897,7 +897,7 @@ async fn run(pool: &P) -> Duration { let rows = Parquet::<_, RepeatedDerived>::new(PathBuf::from( "amadeus-testing/parquet/repeated_no_annotation.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -910,7 +910,7 @@ async fn run(pool: &P) -> Duration { let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/repeated_no_annotation.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -937,7 +937,7 @@ async fn run(pool: &P) -> Duration { } let rows = Parquet::<_, TestDatapage>::new(PathBuf::from( "amadeus-testing/parquet/datapage_v2.snappy.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -950,7 +950,7 @@ async fn run(pool: &P) -> Duration { let rows = Parquet::<_, TestDatapageDerived>::new(PathBuf::from( "amadeus-testing/parquet/datapage_v2.snappy.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -963,7 +963,7 @@ async fn run(pool: &P) -> Duration { let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/datapage_v2.snappy.parquet", - )) + ), None) .await .unwrap(); assert_eq!( @@ -1007,7 +1007,7 @@ async fn run(pool: &P) -> Duration { } let rows = - Parquet::<_, CommitsDerived>::new(PathBuf::from("amadeus-testing/parquet/commits.parquet")) + Parquet::<_, CommitsDerived>::new(PathBuf::from("amadeus-testing/parquet/commits.parquet"), None) .await .unwrap(); assert_eq!( @@ -1018,7 +1018,7 @@ async fn run(pool: &P) -> Duration { 14_444 ); - let rows = Parquet::<_, Value>::new(PathBuf::from("amadeus-testing/parquet/commits.parquet")) + let rows = Parquet::<_, Value>::new(PathBuf::from("amadeus-testing/parquet/commits.parquet"), None) .await .unwrap(); assert_eq!( From dab12e309f9f209f2e4748d453b0dbf56b7c7107 Mon Sep 17 00:00:00 2001 From: Robin Bernon Date: Thu, 25 Mar 2021 12:20:07 +0000 Subject: [PATCH 02/11] Inputing predicate to SerializedFileReader to allow for dynamic reads. --- Cargo.toml | 2 +- amadeus-derive/src/lib.rs | 10 +- amadeus-parquet/Cargo.toml | 2 +- amadeus-parquet/src/internal/record/impls.rs | 2 +- amadeus-parquet/src/internal/record/mod.rs | 6 +- .../src/internal/record/predicates.rs | 12 +- amadeus-parquet/src/internal/record/reader.rs | 4 +- .../src/internal/record/schemas.rs | 2 +- amadeus-parquet/src/lib.rs | 77 ++-- amadeus-serde/Cargo.toml | 2 +- amadeus-serde/src/impls.rs | 2 +- amadeus-types/Cargo.toml | 2 +- amadeus-types/src/group.rs | 2 +- amadeus-types/src/value.rs | 2 +- tests/derive.rs | 14 - tests/parquet.rs | 391 +++++++++++------- tests/parquet_dist.rs | 334 ++++++++------- 17 files changed, 517 insertions(+), 349 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 690140b1..775bf36c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ maintenance = { status = "actively-developed" } constellation = ["bincode", "constellation-rs", "serde_traitobject"] aws = ["amadeus-aws"] commoncrawl = ["amadeus-commoncrawl"] -parquet = ["amadeus-parquet", "amadeus-derive/parquet"] +parquet = ["amadeus-parquet", "amadeus-derive/parquet", "amadeus-serde", "amadeus-derive/serde"] postgres = ["amadeus-postgres", "amadeus-derive/postgres"] csv = ["amadeus-serde", "amadeus-derive/serde"] json = ["amadeus-serde", "amadeus-derive/serde"] diff --git a/amadeus-derive/src/lib.rs b/amadeus-derive/src/lib.rs index 91ae2389..7299d6b6 100644 --- a/amadeus-derive/src/lib.rs +++ b/amadeus-derive/src/lib.rs @@ -267,6 +267,14 @@ fn impl_struct( }; }); + let predicate_derives = if cfg!(feature = "serde") { + Some(quote! { + #[derive(Clone, Debug, __::Serialize, __::Deserialize)] + }) + } else { + None + }; + parquet_derives = Some(quote! { #visibility struct #schema_name #impl_generics #where_clause_with_parquet_data { #(#field_names1: <#field_types1 as __::ParquetData>::Schema,)* @@ -300,7 +308,7 @@ fn impl_struct( #visibility struct #reader_name #impl_generics #where_clause_with_parquet_data { #(#field_names1: <#field_types1 as __::ParquetData>::Reader,)* } - #[derive(Clone, Debug)] + #predicate_derives #visibility struct #predicate_name #impl_generics #where_clause_with_parquet_data { #(#field_names1: __::Option<<#field_types1 as __::ParquetData>::Predicate>,)* } diff --git a/amadeus-parquet/Cargo.toml b/amadeus-parquet/Cargo.toml index da3151f8..14afc1a9 100644 --- a/amadeus-parquet/Cargo.toml +++ b/amadeus-parquet/Cargo.toml @@ -29,7 +29,7 @@ educe = "0.4" flate2 = { version = "1.0.2", features = ["rust_backend"], default-features = false } futures = "0.3" fxhash = "0.2" -hashlink = "0.5" +hashlink = { version = "0.6", features = ["serde_impl"] } lz-fear = "0.1" num-bigint = "0.3" quick-error = "1.2.2" diff --git a/amadeus-parquet/src/internal/record/impls.rs b/amadeus-parquet/src/internal/record/impls.rs index 6b5e2e89..4604196b 100644 --- a/amadeus-parquet/src/internal/record/impls.rs +++ b/amadeus-parquet/src/internal/record/impls.rs @@ -1,4 +1,4 @@ -use hashlink::LinkedHashMap; +use hashlink::linked_hash_map::LinkedHashMap; use std::{ any::type_name, collections::HashMap, convert::{TryFrom, TryInto}, fmt, hash::{BuildHasher, Hash}, marker::PhantomData, string::FromUtf8Error, sync::Arc }; diff --git a/amadeus-parquet/src/internal/record/mod.rs b/amadeus-parquet/src/internal/record/mod.rs index a12b38e5..38be1bc9 100644 --- a/amadeus-parquet/src/internal/record/mod.rs +++ b/amadeus-parquet/src/internal/record/mod.rs @@ -47,6 +47,7 @@ mod schemas; mod triplet; pub mod types; +use serde::{Deserialize, Serialize}; use std::{ collections::HashMap, fmt::{self, Debug} }; @@ -65,7 +66,8 @@ pub use schemas::RootSchema; mod predicate { /// This is for forward compatibility when Predicate pushdown and dynamic schemas are /// implemented. - #[derive(Clone, Debug)] + use serde::{Deserialize, Serialize}; + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct Predicate; } pub(crate) use self::predicate::Predicate; @@ -154,7 +156,7 @@ pub trait ParquetData: Data + Sized { // Clone + PartialEq + Debug + 'static type Schema: Schema; type Reader: Reader; - type Predicate: Clone + Debug; + type Predicate: Clone + Debug + Serialize + for<'de> Deserialize<'de> + Send + 'static; /// Parse a [`Type`] into `Self::Schema`, using `repetition` instead of /// `Type::get_basic_info().repetition()`. A `repetition` of `None` denotes a root diff --git a/amadeus-parquet/src/internal/record/predicates.rs b/amadeus-parquet/src/internal/record/predicates.rs index e0adffd8..f7852e06 100644 --- a/amadeus-parquet/src/internal/record/predicates.rs +++ b/amadeus-parquet/src/internal/record/predicates.rs @@ -1,12 +1,12 @@ -use fxhash::FxBuildHasher; -use hashlink::LinkedHashMap; +use hashlink::linked_hash_map::LinkedHashMap; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; use amadeus_types::{Bson, Date, DateTime, Decimal, Enum, Group, Json, List, Time, Value}; use crate::internal::record::ParquetData; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] /// Predicate for [`Group`]s pub struct MapPredicate { pub(super) key: Option, @@ -18,11 +18,11 @@ impl MapPredicate { } } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] /// Predicate for [`Group`]s pub struct GroupPredicate( /// Map of field names to predicates for the fields in the group - pub(super) LinkedHashMap::Predicate>, FxBuildHasher>, + pub(super) LinkedHashMap::Predicate>>, ); impl GroupPredicate { pub fn new(fields: I) -> Self @@ -33,7 +33,7 @@ impl GroupPredicate { } } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] /// Predicate for [`Value`]s pub enum ValuePredicate { Bool(Option<::Predicate>), diff --git a/amadeus-parquet/src/internal/record/reader.rs b/amadeus-parquet/src/internal/record/reader.rs index da07663c..6118ee22 100644 --- a/amadeus-parquet/src/internal/record/reader.rs +++ b/amadeus-parquet/src/internal/record/reader.rs @@ -24,7 +24,7 @@ //! that are optional or repeated. use fxhash::FxBuildHasher; -use hashlink::LinkedHashMap; +use hashlink::linked_hash_map::LinkedHashMap; use std::{ collections::HashMap, convert::TryInto, error::Error, marker::PhantomData, mem, sync::Arc }; @@ -948,7 +948,7 @@ where mod tests { use super::*; - use hashlink::LinkedHashMap; + use hashlink::linked_hash_map::LinkedHashMap; use std::{collections::HashMap, sync::Arc}; use crate::internal::{ diff --git a/amadeus-parquet/src/internal/record/schemas.rs b/amadeus-parquet/src/internal/record/schemas.rs index beef783b..0190a75e 100644 --- a/amadeus-parquet/src/internal/record/schemas.rs +++ b/amadeus-parquet/src/internal/record/schemas.rs @@ -31,7 +31,7 @@ //! ``` use fxhash::FxBuildHasher; -use hashlink::LinkedHashMap; +use hashlink::linked_hash_map::LinkedHashMap; use std::{ fmt::{self, Debug, Display}, marker::PhantomData, mem, str::FromStr }; diff --git a/amadeus-parquet/src/lib.rs b/amadeus-parquet/src/lib.rs index e283218c..40f8259f 100644 --- a/amadeus-parquet/src/lib.rs +++ b/amadeus-parquet/src/lib.rs @@ -60,7 +60,9 @@ mod wrap { file::{Directory, File, Page, Partition, PathBuf}, into_par_stream::IntoDistributedStream, par_stream::DistributedStream, util::{DistParStream, ResultExpandIter}, Source }; - pub use internal::record::ParquetData; + pub use internal::record::{ + predicates::{GroupPredicate, ValuePredicate}, ParquetData + }; #[doc(hidden)] pub mod derive { @@ -79,7 +81,7 @@ mod wrap { Row: ParquetData, { partitions: Vec, - row_predicate: Option, + row_predicate: Option, marker: PhantomData Row>, } impl Parquet @@ -87,10 +89,12 @@ mod wrap { F: File, Row: ParquetData + 'static, { - pub async fn new(file: F, row_predicate: Option) -> Result::Error> { + pub async fn new( + file: F, row_predicate: Option, + ) -> Result::Error> { Ok(Self { partitions: file.partitions().await.map_err(ParquetError::File)?, - row_predicate, + row_predicate, marker: PhantomData, }) } @@ -117,38 +121,45 @@ mod wrap { } #[allow(clippy::let_and_return)] fn dist_stream(self) -> Self::DistStream { - let predicate = self.row_predicate.clone(); + let predicate = self.row_predicate; self.partitions .into_dist_stream() - .flat_map(FnMut!(|partition: F::Partition| async move { - Ok(stream::iter( - partition - .pages() - .await - .map_err(ParquetError::Partition)? - .into_iter(), - ) - .flat_map(|page| { - async move { - let mut buf = Vec::with_capacity(10 * 1024 * 1024); - let reader = Page::reader(page); - pin_mut!(reader); - let buf = PassError::new( - reader.read_to_end(&mut buf).await.map(|_| Cursor::new(buf)), - ); - Ok(stream::iter( - SerializedFileReader::new(buf)?.get_row_iter::(None)?, - )) - } - .map(ResultExpandIter::new) - .flatten_stream() - }) - .map(|row: Result, Self::Error>| Ok(row??))) - } - .map(ResultExpandIter::new) - .flatten_stream() - .map(|row: Result, Self::Error>| Ok(row??)))) + .flat_map(FnMut!(move |partition: F::Partition| { + let predicate = predicate.clone(); + + async move { + Ok(stream::iter( + partition + .pages() + .await + .map_err(ParquetError::Partition)? + .into_iter(), + ) + .flat_map(move |page| { + let predicate = predicate.clone(); + async move { + let mut buf = Vec::with_capacity(10 * 1024 * 1024); + let reader = Page::reader(page); + pin_mut!(reader); + let buf = PassError::new( + reader.read_to_end(&mut buf).await.map(|_| Cursor::new(buf)), + ); + + Ok(stream::iter( + SerializedFileReader::new(buf)? + .get_row_iter::(predicate)?, + )) + } + .map(ResultExpandIter::new) + .flatten_stream() + }) + .map(|row: Result, Self::Error>| Ok(row??))) + } + .map(ResultExpandIter::new) + .flatten_stream() + .map(|row: Result, Self::Error>| Ok(row??)) + })) } } diff --git a/amadeus-serde/Cargo.toml b/amadeus-serde/Cargo.toml index 7a9f5a85..4563bccb 100644 --- a/amadeus-serde/Cargo.toml +++ b/amadeus-serde/Cargo.toml @@ -25,7 +25,7 @@ chrono = { version = "0.4", default-features = false, features = ["serde"] } csv = "1.0" educe = "0.4" futures = "0.3" -hashlink = "0.5" +hashlink = "0.6" serde = { version = "1.0", features = ["derive"] } serde_bytes = "0.11" serde_closure = "0.3" diff --git a/amadeus-serde/src/impls.rs b/amadeus-serde/src/impls.rs index 07e6e7bf..8e434ca8 100644 --- a/amadeus-serde/src/impls.rs +++ b/amadeus-serde/src/impls.rs @@ -1,6 +1,6 @@ #![allow(clippy::too_many_lines)] -use hashlink::LinkedHashMap; +use hashlink::linked_hash_map::LinkedHashMap; use recycle::VecExt; use serde::{ de::{self, MapAccess, SeqAccess, Visitor}, ser::{SerializeSeq, SerializeStruct, SerializeTupleStruct}, Deserializer, Serializer diff --git a/amadeus-types/Cargo.toml b/amadeus-types/Cargo.toml index b066f023..301d9b14 100644 --- a/amadeus-types/Cargo.toml +++ b/amadeus-types/Cargo.toml @@ -23,7 +23,7 @@ amadeus-core = { version = "=0.4.2", path = "../amadeus-core" } chrono = { version = "0.4", default-features = false, features = ["std", "serde"] } chrono-tz = { version = "0.5", features = ["serde"] } fxhash = "0.2" -hashlink = "0.5" +hashlink = "0.6" once_cell = "1.0" ordered-float = "2.0" serde = { version = "1.0", features = ["derive"] } diff --git a/amadeus-types/src/group.rs b/amadeus-types/src/group.rs index 626316dd..c831a985 100644 --- a/amadeus-types/src/group.rs +++ b/amadeus-types/src/group.rs @@ -1,7 +1,7 @@ //! Implement [`Record`] for [`Group`] aka [`Row`]. use fxhash::FxBuildHasher; -use hashlink::LinkedHashMap; +use hashlink::linked_hash_map::LinkedHashMap; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::{ cmp::Ordering, fmt::{self, Debug}, ops::Index, slice::SliceIndex, str, sync::Arc diff --git a/amadeus-types/src/value.rs b/amadeus-types/src/value.rs index 48784ae7..cd8d4077 100644 --- a/amadeus-types/src/value.rs +++ b/amadeus-types/src/value.rs @@ -3,7 +3,7 @@ #![allow(clippy::type_complexity)] use fxhash::FxBuildHasher; -use hashlink::LinkedHashMap; +use hashlink::linked_hash_map::LinkedHashMap; use recycle::VecExt; use serde::{de::Deserializer, ser::Serializer, Deserialize, Serialize}; use std::{ diff --git a/tests/derive.rs b/tests/derive.rs index f014df36..8ed35c31 100644 --- a/tests/derive.rs +++ b/tests/derive.rs @@ -1,11 +1,6 @@ use amadeus::prelude::*; use serde::{Deserialize, Serialize}; -#[derive(Data, Clone, PartialEq, Debug)] -struct GenericRow { - t: G, -} - #[derive(Data, Clone, PartialEq, Serialize, Deserialize, Debug)] struct Row { a: String, @@ -33,12 +28,3 @@ fn list() { let rows2 = serde_json::from_str(&*json).unwrap(); assert_eq!(rows, rows2); } - -mod no_prelude { - #![no_implicit_prelude] - - #[derive(::amadeus::prelude::Data, Clone, PartialEq, Debug)] - struct GenericRow { - t: G, - } -} diff --git a/tests/parquet.rs b/tests/parquet.rs index 0259def0..b69bacda 100644 --- a/tests/parquet.rs +++ b/tests/parquet.rs @@ -9,6 +9,59 @@ use std::{collections::HashMap, path::PathBuf, time::SystemTime}; use amadeus::prelude::*; +use amadeus::amadeus_parquet::GroupPredicate; +use amadeus_parquet::ValuePredicate; + +fn assert_columns_in_row( + expected_column_names: &Vec, row: Result, +) -> Result { + let row_unwrapped = row.ok().unwrap(); + let row_group = row_unwrapped.clone().into_group().ok().unwrap(); + let field_names = row_group.field_names().unwrap(); + + // Adds extra side check to make sure that order is also maintained. + for (expected_column_name, (actual_column_name, _)) in + expected_column_names.iter().zip(field_names.iter()) + { + assert_eq!(expected_column_name, actual_column_name); + } + + Ok(row_unwrapped) +} + +fn get_row_predicate(column_names: Vec) -> Option { + Some(ValuePredicate::Group(Some(GroupPredicate::new( + column_names.into_iter().zip(None.into_iter()), + )))) +} + +#[tokio::test(threaded_scheduler)] +#[cfg_attr(miri, ignore)] +async fn dynamic_reads() { + let start = SystemTime::now(); + + let pool = &ThreadPool::new(None).unwrap(); + + let column_names = vec!["uri".to_string(), "location".to_string()]; + + #[cfg(feature = "aws")] + { + let rows = Parquet::<_, Value>::new(vec![ + S3File::new_with(AwsRegion::UsEast1, "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/year=2018/month=11/day=02/part-00176-17868f39-cd99-4b60-bb48-8daf9072122e.c000.snappy.parquet", AwsCredentials::Anonymous), + ], get_row_predicate(column_names.clone())).await.unwrap(); + + assert_eq!( + rows.par_stream() + .map(move |row: Result| assert_columns_in_row(&column_names, row)) + .count(pool) + .await, + 4172 + ); + } + + println!("in {:?}", start.elapsed().unwrap()); +} + #[tokio::test(threaded_scheduler)] #[cfg_attr(miri, ignore)] async fn parquet() { @@ -34,9 +87,10 @@ async fn parquet() { 207_535 ); - let rows = Parquet::<_, Value>::new(ParquetDirectory::new(PathBuf::from( - "amadeus-testing/parquet/cf-accesslogs/", - )), None) + let rows = Parquet::<_, Value>::new( + ParquetDirectory::new(PathBuf::from("amadeus-testing/parquet/cf-accesslogs/")), + None, + ) .await .unwrap(); assert_eq!( @@ -47,7 +101,7 @@ async fn parquet() { 207_535 ); - #[cfg(feature = "aws")] + #[cfg(feature = "aws")] { let rows = Parquet::<_, Value>::new(vec![S3File::new_with(AwsRegion::UsEast1, "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/year=2018/month=11/day=03/part-00137-17868f39-cd99-4b60-bb48-8daf9072122e.c000.snappy.parquet", AwsCredentials::Anonymous);20], None).await.unwrap(); assert_eq!( @@ -58,12 +112,15 @@ async fn parquet() { 45_167 * 20 ); - let rows = Parquet::<_, Value>::new(ParquetDirectory::new(S3Directory::new_with( - AwsRegion::UsEast1, - "us-east-1.data-analytics", - "cflogworkshop/optimized/cf-accesslogs/", - AwsCredentials::Anonymous, - )), None) + let rows = Parquet::<_, Value>::new( + ParquetDirectory::new(S3Directory::new_with( + AwsRegion::UsEast1, + "us-east-1.data-analytics", + "cflogworkshop/optimized/cf-accesslogs/", + AwsCredentials::Anonymous, + )), + None, + ) .await .unwrap(); assert_eq!( @@ -92,12 +149,15 @@ async fn parquet() { 207_535 ); - let rows = Parquet::<_, Value>::new(ParquetDirectory::new(S3Directory::new_with( - AwsRegion::UsEast1, - "us-east-1.data-analytics", - "cflogworkshop/optimized/cf-accesslogs/", - AwsCredentials::Anonymous, - )), None) + let rows = Parquet::<_, Value>::new( + ParquetDirectory::new(S3Directory::new_with( + AwsRegion::UsEast1, + "us-east-1.data-analytics", + "cflogworkshop/optimized/cf-accesslogs/", + AwsCredentials::Anonymous, + )), + None, + ) .await .unwrap(); assert_eq!( @@ -134,9 +194,10 @@ async fn parquet() { valid: Option, __index_level_0__: Option, } - let rows = Parquet::<_, StockSimulatedDerived>::new(PathBuf::from( - "amadeus-testing/parquet/stock_simulated.parquet", - ), None) + let rows = Parquet::<_, StockSimulatedDerived>::new( + PathBuf::from("amadeus-testing/parquet/stock_simulated.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -147,9 +208,10 @@ async fn parquet() { 42_000 ); - let rows = Parquet::<_, Value>::new(PathBuf::from( - "amadeus-testing/parquet/stock_simulated.parquet", - ), None) + let rows = Parquet::<_, Value>::new( + PathBuf::from("amadeus-testing/parquet/stock_simulated.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -164,15 +226,16 @@ async fn parquet() { 42_000 ); - #[derive(Data, Clone, PartialEq, Debug)] + #[derive(Data, Clone, PartialEq, Debug)] struct StockSimulatedDerivedProjection1 { bs5: Option, __index_level_0__: Option, } - let rows = Parquet::<_, StockSimulatedDerivedProjection1>::new(PathBuf::from( - "amadeus-testing/parquet/stock_simulated.parquet", - ), None) + let rows = Parquet::<_, StockSimulatedDerivedProjection1>::new( + PathBuf::from("amadeus-testing/parquet/stock_simulated.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -183,9 +246,10 @@ async fn parquet() { 42_000 ); - let rows = Parquet::<_, Value>::new(PathBuf::from( - "amadeus-testing/parquet/stock_simulated.parquet", - ), None) + let rows = Parquet::<_, Value>::new( + PathBuf::from("amadeus-testing/parquet/stock_simulated.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -203,9 +267,10 @@ async fn parquet() { #[derive(Data, Clone, PartialEq, Debug)] struct StockSimulatedDerivedProjection2 {} - let rows = Parquet::<_, StockSimulatedDerivedProjection2>::new(PathBuf::from( - "amadeus-testing/parquet/stock_simulated.parquet", - ), None) + let rows = Parquet::<_, StockSimulatedDerivedProjection2>::new( + PathBuf::from("amadeus-testing/parquet/stock_simulated.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -216,9 +281,10 @@ async fn parquet() { 42_000 ); - let rows = Parquet::<_, Value>::new(PathBuf::from( - "amadeus-testing/parquet/stock_simulated.parquet", - ), None) + let rows = Parquet::<_, Value>::new( + PathBuf::from("amadeus-testing/parquet/stock_simulated.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -256,10 +322,12 @@ async fn parquet() { int96_field: DateTime, } - let rows = - Parquet::<_, TenKayVeeTwo>::new(PathBuf::from("amadeus-testing/parquet/10k-v2.parquet"), None) - .await - .unwrap(); + let rows = Parquet::<_, TenKayVeeTwo>::new( + PathBuf::from("amadeus-testing/parquet/10k-v2.parquet"), + None, + ) + .await + .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) @@ -268,9 +336,10 @@ async fn parquet() { 10_000 ); - let rows = Parquet::<_, TenKayVeeTwoDerived>::new(PathBuf::from( - "amadeus-testing/parquet/10k-v2.parquet", - ), None) + let rows = Parquet::<_, TenKayVeeTwoDerived>::new( + PathBuf::from("amadeus-testing/parquet/10k-v2.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -281,9 +350,12 @@ async fn parquet() { 10_000 ); - let rows = Parquet::<_, Value>::new(PathBuf::from("amadeus-testing/parquet/10k-v2.parquet"), None) - .await - .unwrap(); + let rows = Parquet::<_, Value>::new( + PathBuf::from("amadeus-testing/parquet/10k-v2.parquet"), + None, + ) + .await + .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result| -> Value { @@ -326,9 +398,10 @@ async fn parquet() { timestamp_col: Option, } - let rows = Parquet::<_, AlltypesDictionary>::new(PathBuf::from( - "amadeus-testing/parquet/alltypes_dictionary.parquet", - ), None) + let rows = Parquet::<_, AlltypesDictionary>::new( + PathBuf::from("amadeus-testing/parquet/alltypes_dictionary.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -339,9 +412,10 @@ async fn parquet() { 2 ); - let rows = Parquet::<_, AlltypesDictionaryDerived>::new(PathBuf::from( - "amadeus-testing/parquet/alltypes_dictionary.parquet", - ), None) + let rows = Parquet::<_, AlltypesDictionaryDerived>::new( + PathBuf::from("amadeus-testing/parquet/alltypes_dictionary.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -352,9 +426,10 @@ async fn parquet() { 2 ); - let rows = Parquet::<_, Value>::new(PathBuf::from( - "amadeus-testing/parquet/alltypes_dictionary.parquet", - ), None) + let rows = Parquet::<_, Value>::new( + PathBuf::from("amadeus-testing/parquet/alltypes_dictionary.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -399,9 +474,10 @@ async fn parquet() { timestamp_col: Option, } - let rows = Parquet::<_, AlltypesPlain>::new(PathBuf::from( - "amadeus-testing/parquet/alltypes_plain.parquet", - ), None) + let rows = Parquet::<_, AlltypesPlain>::new( + PathBuf::from("amadeus-testing/parquet/alltypes_plain.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -412,9 +488,10 @@ async fn parquet() { 8 ); - let rows = Parquet::<_, AlltypesPlainDerived>::new(PathBuf::from( - "amadeus-testing/parquet/alltypes_plain.parquet", - ), None) + let rows = Parquet::<_, AlltypesPlainDerived>::new( + PathBuf::from("amadeus-testing/parquet/alltypes_plain.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -425,9 +502,10 @@ async fn parquet() { 8 ); - let rows = Parquet::<_, Value>::new(PathBuf::from( - "amadeus-testing/parquet/alltypes_plain.parquet", - ), None) + let rows = Parquet::<_, Value>::new( + PathBuf::from("amadeus-testing/parquet/alltypes_plain.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -472,9 +550,10 @@ async fn parquet() { timestamp_col: Option, } - let rows = Parquet::<_, AlltypesPlainSnappy>::new(PathBuf::from( - "amadeus-testing/parquet/alltypes_plain.snappy.parquet", - ), None) + let rows = Parquet::<_, AlltypesPlainSnappy>::new( + PathBuf::from("amadeus-testing/parquet/alltypes_plain.snappy.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -485,9 +564,10 @@ async fn parquet() { 2 ); - let rows = Parquet::<_, AlltypesPlainSnappyDerived>::new(PathBuf::from( - "amadeus-testing/parquet/alltypes_plain.snappy.parquet", - ), None) + let rows = Parquet::<_, AlltypesPlainSnappyDerived>::new( + PathBuf::from("amadeus-testing/parquet/alltypes_plain.snappy.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -498,9 +578,10 @@ async fn parquet() { 2 ); - let rows = Parquet::<_, Value>::new(PathBuf::from( - "amadeus-testing/parquet/alltypes_plain.snappy.parquet", - ), None) + let rows = Parquet::<_, Value>::new( + PathBuf::from("amadeus-testing/parquet/alltypes_plain.snappy.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -553,9 +634,10 @@ async fn parquet() { a: Option>>>>>>, b: i32, } - let rows = Parquet::<_, NestedLists>::new(PathBuf::from( - "amadeus-testing/parquet/nested_lists.snappy.parquet", - ), None) + let rows = Parquet::<_, NestedLists>::new( + PathBuf::from("amadeus-testing/parquet/nested_lists.snappy.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -566,9 +648,10 @@ async fn parquet() { 3 ); - let rows = Parquet::<_, NestedListsDerived>::new(PathBuf::from( - "amadeus-testing/parquet/nested_lists.snappy.parquet", - ), None) + let rows = Parquet::<_, NestedListsDerived>::new( + PathBuf::from("amadeus-testing/parquet/nested_lists.snappy.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -579,9 +662,10 @@ async fn parquet() { 3 ); - let rows = Parquet::<_, Value>::new(PathBuf::from( - "amadeus-testing/parquet/nested_lists.snappy.parquet", - ), None) + let rows = Parquet::<_, Value>::new( + PathBuf::from("amadeus-testing/parquet/nested_lists.snappy.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -608,9 +692,10 @@ async fn parquet() { b: i32, c: f64, } - let rows = Parquet::<_, NestedMaps>::new(PathBuf::from( - "amadeus-testing/parquet/nested_maps.snappy.parquet", - ), None) + let rows = Parquet::<_, NestedMaps>::new( + PathBuf::from("amadeus-testing/parquet/nested_maps.snappy.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -621,9 +706,10 @@ async fn parquet() { 6 ); - let rows = Parquet::<_, NestedMapsDerived>::new(PathBuf::from( - "amadeus-testing/parquet/nested_maps.snappy.parquet", - ), None) + let rows = Parquet::<_, NestedMapsDerived>::new( + PathBuf::from("amadeus-testing/parquet/nested_maps.snappy.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -634,9 +720,10 @@ async fn parquet() { 6 ); - let rows = Parquet::<_, Value>::new(PathBuf::from( - "amadeus-testing/parquet/nested_maps.snappy.parquet", - ), None) + let rows = Parquet::<_, Value>::new( + PathBuf::from("amadeus-testing/parquet/nested_maps.snappy.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -702,9 +789,10 @@ async fn parquet() { f: String, } - let rows = Parquet::<_, Nonnullable>::new(PathBuf::from( - "amadeus-testing/parquet/nonnullable.impala.parquet", - ), None) + let rows = Parquet::<_, Nonnullable>::new( + PathBuf::from("amadeus-testing/parquet/nonnullable.impala.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -715,9 +803,10 @@ async fn parquet() { 1 ); - let rows = Parquet::<_, NonnullableDerived>::new(PathBuf::from( - "amadeus-testing/parquet/nonnullable.impala.parquet", - ), None) + let rows = Parquet::<_, NonnullableDerived>::new( + PathBuf::from("amadeus-testing/parquet/nonnullable.impala.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -728,9 +817,10 @@ async fn parquet() { 1 ); - let rows = Parquet::<_, Value>::new(PathBuf::from( - "amadeus-testing/parquet/nonnullable.impala.parquet", - ), None) + let rows = Parquet::<_, Value>::new( + PathBuf::from("amadeus-testing/parquet/nonnullable.impala.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -775,9 +865,10 @@ async fn parquet() { Option>>,)>,)>>>, )>, } - let rows = Parquet::<_, Nullable>::new(PathBuf::from( - "amadeus-testing/parquet/nullable.impala.parquet", - ), None) + let rows = Parquet::<_, Nullable>::new( + PathBuf::from("amadeus-testing/parquet/nullable.impala.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -788,9 +879,10 @@ async fn parquet() { 7 ); - let rows = Parquet::<_, NullableDerived>::new(PathBuf::from( - "amadeus-testing/parquet/nullable.impala.parquet", - ), None) + let rows = Parquet::<_, NullableDerived>::new( + PathBuf::from("amadeus-testing/parquet/nullable.impala.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -801,9 +893,10 @@ async fn parquet() { 7 ); - let rows = Parquet::<_, Value>::new(PathBuf::from( - "amadeus-testing/parquet/nullable.impala.parquet", - ), None) + let rows = Parquet::<_, Value>::new( + PathBuf::from("amadeus-testing/parquet/nullable.impala.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -824,9 +917,10 @@ async fn parquet() { struct NullsDerived { b_struct: Option<(Option,)>, } - let rows = Parquet::<_, Nulls>::new(PathBuf::from( - "amadeus-testing/parquet/nulls.snappy.parquet", - ), None) + let rows = Parquet::<_, Nulls>::new( + PathBuf::from("amadeus-testing/parquet/nulls.snappy.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -837,9 +931,10 @@ async fn parquet() { 8 ); - let rows = Parquet::<_, NullsDerived>::new(PathBuf::from( - "amadeus-testing/parquet/nulls.snappy.parquet", - ), None) + let rows = Parquet::<_, NullsDerived>::new( + PathBuf::from("amadeus-testing/parquet/nulls.snappy.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -850,9 +945,10 @@ async fn parquet() { 8 ); - let rows = Parquet::<_, Value>::new(PathBuf::from( - "amadeus-testing/parquet/nulls.snappy.parquet", - ), None) + let rows = Parquet::<_, Value>::new( + PathBuf::from("amadeus-testing/parquet/nulls.snappy.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -875,9 +971,10 @@ async fn parquet() { #[amadeus(name = "phoneNumbers")] phone_numbers: Option<(List<(i64, Option)>,)>, } - let rows = Parquet::<_, Repeated>::new(PathBuf::from( - "amadeus-testing/parquet/repeated_no_annotation.parquet", - ), None) + let rows = Parquet::<_, Repeated>::new( + PathBuf::from("amadeus-testing/parquet/repeated_no_annotation.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -888,9 +985,10 @@ async fn parquet() { 6 ); - let rows = Parquet::<_, RepeatedDerived>::new(PathBuf::from( - "amadeus-testing/parquet/repeated_no_annotation.parquet", - ), None) + let rows = Parquet::<_, RepeatedDerived>::new( + PathBuf::from("amadeus-testing/parquet/repeated_no_annotation.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -901,9 +999,10 @@ async fn parquet() { 6 ); - let rows = Parquet::<_, Value>::new(PathBuf::from( - "amadeus-testing/parquet/repeated_no_annotation.parquet", - ), None) + let rows = Parquet::<_, Value>::new( + PathBuf::from("amadeus-testing/parquet/repeated_no_annotation.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -928,9 +1027,10 @@ async fn parquet() { d: bool, e: Option>, } - let rows = Parquet::<_, TestDatapage>::new(PathBuf::from( - "amadeus-testing/parquet/datapage_v2.snappy.parquet", - ), None) + let rows = Parquet::<_, TestDatapage>::new( + PathBuf::from("amadeus-testing/parquet/datapage_v2.snappy.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -941,9 +1041,10 @@ async fn parquet() { 5 ); - let rows = Parquet::<_, TestDatapageDerived>::new(PathBuf::from( - "amadeus-testing/parquet/datapage_v2.snappy.parquet", - ), None) + let rows = Parquet::<_, TestDatapageDerived>::new( + PathBuf::from("amadeus-testing/parquet/datapage_v2.snappy.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -954,9 +1055,10 @@ async fn parquet() { 5 ); - let rows = Parquet::<_, Value>::new(PathBuf::from( - "amadeus-testing/parquet/datapage_v2.snappy.parquet", - ), None) + let rows = Parquet::<_, Value>::new( + PathBuf::from("amadeus-testing/parquet/datapage_v2.snappy.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -999,10 +1101,12 @@ async fn parquet() { __index_level_0__: Option, } - let rows = - Parquet::<_, CommitsDerived>::new(PathBuf::from("amadeus-testing/parquet/commits.parquet"), None) - .await - .unwrap(); + let rows = Parquet::<_, CommitsDerived>::new( + PathBuf::from("amadeus-testing/parquet/commits.parquet"), + None, + ) + .await + .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) @@ -1011,9 +1115,12 @@ async fn parquet() { 14_444 ); - let rows = Parquet::<_, Value>::new(PathBuf::from("amadeus-testing/parquet/commits.parquet"), None) - .await - .unwrap(); + let rows = Parquet::<_, Value>::new( + PathBuf::from("amadeus-testing/parquet/commits.parquet"), + None, + ) + .await + .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result| -> Value { diff --git a/tests/parquet_dist.rs b/tests/parquet_dist.rs index 7f847fd9..7db415e7 100644 --- a/tests/parquet_dist.rs +++ b/tests/parquet_dist.rs @@ -41,9 +41,10 @@ fn main() { async fn run(pool: &P) -> Duration { let start = SystemTime::now(); - let rows = Parquet::<_, Value>::new(ParquetDirectory::new(PathBuf::from( - "amadeus-testing/parquet/cf-accesslogs/", - )), None) + let rows = Parquet::<_, Value>::new( + ParquetDirectory::new(PathBuf::from("amadeus-testing/parquet/cf-accesslogs/")), + None, + ) .await .unwrap(); assert_eq!( @@ -65,12 +66,15 @@ async fn run(pool: &P) -> Duration { 45_167 * 20 ); - let rows = Parquet::<_, Value>::new(ParquetDirectory::new(S3Directory::new_with( - AwsRegion::UsEast1, - "us-east-1.data-analytics", - "cflogworkshop/optimized/cf-accesslogs/", - AwsCredentials::Anonymous, - )), None) + let rows = Parquet::<_, Value>::new( + ParquetDirectory::new(S3Directory::new_with( + AwsRegion::UsEast1, + "us-east-1.data-analytics", + "cflogworkshop/optimized/cf-accesslogs/", + AwsCredentials::Anonymous, + )), + None, + ) .await .unwrap(); assert_eq!( @@ -99,12 +103,15 @@ async fn run(pool: &P) -> Duration { 207_535 ); - let rows = Parquet::<_, Value>::new(ParquetDirectory::new(S3Directory::new_with( - AwsRegion::UsEast1, - "us-east-1.data-analytics", - "cflogworkshop/optimized/cf-accesslogs/", - AwsCredentials::Anonymous, - )), None) + let rows = Parquet::<_, Value>::new( + ParquetDirectory::new(S3Directory::new_with( + AwsRegion::UsEast1, + "us-east-1.data-analytics", + "cflogworkshop/optimized/cf-accesslogs/", + AwsCredentials::Anonymous, + )), + None, + ) .await .unwrap(); assert_eq!( @@ -141,9 +148,10 @@ async fn run(pool: &P) -> Duration { valid: Option, __index_level_0__: Option, } - let rows = Parquet::<_, StockSimulatedDerived>::new(PathBuf::from( - "amadeus-testing/parquet/stock_simulated.parquet", - ), None) + let rows = Parquet::<_, StockSimulatedDerived>::new( + PathBuf::from("amadeus-testing/parquet/stock_simulated.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -154,9 +162,10 @@ async fn run(pool: &P) -> Duration { 42_000 ); - let rows = Parquet::<_, Value>::new(PathBuf::from( - "amadeus-testing/parquet/stock_simulated.parquet", - ), None) + let rows = Parquet::<_, Value>::new( + PathBuf::from("amadeus-testing/parquet/stock_simulated.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -177,9 +186,10 @@ async fn run(pool: &P) -> Duration { __index_level_0__: Option, } - let rows = Parquet::<_, StockSimulatedDerivedProjection1>::new(PathBuf::from( - "amadeus-testing/parquet/stock_simulated.parquet", - ), None) + let rows = Parquet::<_, StockSimulatedDerivedProjection1>::new( + PathBuf::from("amadeus-testing/parquet/stock_simulated.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -190,9 +200,10 @@ async fn run(pool: &P) -> Duration { 42_000 ); - let rows = Parquet::<_, Value>::new(PathBuf::from( - "amadeus-testing/parquet/stock_simulated.parquet", - ), None) + let rows = Parquet::<_, Value>::new( + PathBuf::from("amadeus-testing/parquet/stock_simulated.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -210,9 +221,10 @@ async fn run(pool: &P) -> Duration { #[derive(Data, Clone, PartialEq, Debug)] struct StockSimulatedDerivedProjection2 {} - let rows = Parquet::<_, StockSimulatedDerivedProjection2>::new(PathBuf::from( - "amadeus-testing/parquet/stock_simulated.parquet", - ), None) + let rows = Parquet::<_, StockSimulatedDerivedProjection2>::new( + PathBuf::from("amadeus-testing/parquet/stock_simulated.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -223,9 +235,10 @@ async fn run(pool: &P) -> Duration { 42_000 ); - let rows = Parquet::<_, Value>::new(PathBuf::from( - "amadeus-testing/parquet/stock_simulated.parquet", - ), None) + let rows = Parquet::<_, Value>::new( + PathBuf::from("amadeus-testing/parquet/stock_simulated.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -263,10 +276,12 @@ async fn run(pool: &P) -> Duration { int96_field: DateTime, } - let rows = - Parquet::<_, TenKayVeeTwo>::new(PathBuf::from("amadeus-testing/parquet/10k-v2.parquet"), None) - .await - .unwrap(); + let rows = Parquet::<_, TenKayVeeTwo>::new( + PathBuf::from("amadeus-testing/parquet/10k-v2.parquet"), + None, + ) + .await + .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result<_, _>| row.unwrap())) @@ -275,9 +290,10 @@ async fn run(pool: &P) -> Duration { 10_000 ); - let rows = Parquet::<_, TenKayVeeTwoDerived>::new(PathBuf::from( - "amadeus-testing/parquet/10k-v2.parquet", - ), None) + let rows = Parquet::<_, TenKayVeeTwoDerived>::new( + PathBuf::from("amadeus-testing/parquet/10k-v2.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -288,9 +304,12 @@ async fn run(pool: &P) -> Duration { 10_000 ); - let rows = Parquet::<_, Value>::new(PathBuf::from("amadeus-testing/parquet/10k-v2.parquet"), None) - .await - .unwrap(); + let rows = Parquet::<_, Value>::new( + PathBuf::from("amadeus-testing/parquet/10k-v2.parquet"), + None, + ) + .await + .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result| -> Value { @@ -333,9 +352,10 @@ async fn run(pool: &P) -> Duration { timestamp_col: Option, } - let rows = Parquet::<_, AlltypesDictionary>::new(PathBuf::from( - "amadeus-testing/parquet/alltypes_dictionary.parquet", - ), None) + let rows = Parquet::<_, AlltypesDictionary>::new( + PathBuf::from("amadeus-testing/parquet/alltypes_dictionary.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -346,9 +366,10 @@ async fn run(pool: &P) -> Duration { 2 ); - let rows = Parquet::<_, AlltypesDictionaryDerived>::new(PathBuf::from( - "amadeus-testing/parquet/alltypes_dictionary.parquet", - ), None) + let rows = Parquet::<_, AlltypesDictionaryDerived>::new( + PathBuf::from("amadeus-testing/parquet/alltypes_dictionary.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -359,9 +380,10 @@ async fn run(pool: &P) -> Duration { 2 ); - let rows = Parquet::<_, Value>::new(PathBuf::from( - "amadeus-testing/parquet/alltypes_dictionary.parquet", - ), None) + let rows = Parquet::<_, Value>::new( + PathBuf::from("amadeus-testing/parquet/alltypes_dictionary.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -406,9 +428,10 @@ async fn run(pool: &P) -> Duration { timestamp_col: Option, } - let rows = Parquet::<_, AlltypesPlain>::new(PathBuf::from( - "amadeus-testing/parquet/alltypes_plain.parquet", - ), None) + let rows = Parquet::<_, AlltypesPlain>::new( + PathBuf::from("amadeus-testing/parquet/alltypes_plain.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -419,9 +442,10 @@ async fn run(pool: &P) -> Duration { 8 ); - let rows = Parquet::<_, AlltypesPlainDerived>::new(PathBuf::from( - "amadeus-testing/parquet/alltypes_plain.parquet", - ), None) + let rows = Parquet::<_, AlltypesPlainDerived>::new( + PathBuf::from("amadeus-testing/parquet/alltypes_plain.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -432,9 +456,10 @@ async fn run(pool: &P) -> Duration { 8 ); - let rows = Parquet::<_, Value>::new(PathBuf::from( - "amadeus-testing/parquet/alltypes_plain.parquet", - ), None) + let rows = Parquet::<_, Value>::new( + PathBuf::from("amadeus-testing/parquet/alltypes_plain.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -479,9 +504,10 @@ async fn run(pool: &P) -> Duration { timestamp_col: Option, } - let rows = Parquet::<_, AlltypesPlainSnappy>::new(PathBuf::from( - "amadeus-testing/parquet/alltypes_plain.snappy.parquet", - ), None) + let rows = Parquet::<_, AlltypesPlainSnappy>::new( + PathBuf::from("amadeus-testing/parquet/alltypes_plain.snappy.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -492,9 +518,10 @@ async fn run(pool: &P) -> Duration { 2 ); - let rows = Parquet::<_, AlltypesPlainSnappyDerived>::new(PathBuf::from( - "amadeus-testing/parquet/alltypes_plain.snappy.parquet", - ), None) + let rows = Parquet::<_, AlltypesPlainSnappyDerived>::new( + PathBuf::from("amadeus-testing/parquet/alltypes_plain.snappy.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -505,9 +532,10 @@ async fn run(pool: &P) -> Duration { 2 ); - let rows = Parquet::<_, Value>::new(PathBuf::from( - "amadeus-testing/parquet/alltypes_plain.snappy.parquet", - ), None) + let rows = Parquet::<_, Value>::new( + PathBuf::from("amadeus-testing/parquet/alltypes_plain.snappy.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -560,9 +588,10 @@ async fn run(pool: &P) -> Duration { a: Option>>>>>>, b: i32, } - let rows = Parquet::<_, NestedLists>::new(PathBuf::from( - "amadeus-testing/parquet/nested_lists.snappy.parquet", - ), None) + let rows = Parquet::<_, NestedLists>::new( + PathBuf::from("amadeus-testing/parquet/nested_lists.snappy.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -573,9 +602,10 @@ async fn run(pool: &P) -> Duration { 3 ); - let rows = Parquet::<_, NestedListsDerived>::new(PathBuf::from( - "amadeus-testing/parquet/nested_lists.snappy.parquet", - ), None) + let rows = Parquet::<_, NestedListsDerived>::new( + PathBuf::from("amadeus-testing/parquet/nested_lists.snappy.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -586,9 +616,10 @@ async fn run(pool: &P) -> Duration { 3 ); - let rows = Parquet::<_, Value>::new(PathBuf::from( - "amadeus-testing/parquet/nested_lists.snappy.parquet", - ), None) + let rows = Parquet::<_, Value>::new( + PathBuf::from("amadeus-testing/parquet/nested_lists.snappy.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -615,9 +646,10 @@ async fn run(pool: &P) -> Duration { b: i32, c: f64, } - let rows = Parquet::<_, NestedMaps>::new(PathBuf::from( - "amadeus-testing/parquet/nested_maps.snappy.parquet", - ), None) + let rows = Parquet::<_, NestedMaps>::new( + PathBuf::from("amadeus-testing/parquet/nested_maps.snappy.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -628,9 +660,10 @@ async fn run(pool: &P) -> Duration { 6 ); - let rows = Parquet::<_, NestedMapsDerived>::new(PathBuf::from( - "amadeus-testing/parquet/nested_maps.snappy.parquet", - ), None) + let rows = Parquet::<_, NestedMapsDerived>::new( + PathBuf::from("amadeus-testing/parquet/nested_maps.snappy.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -641,9 +674,10 @@ async fn run(pool: &P) -> Duration { 6 ); - let rows = Parquet::<_, Value>::new(PathBuf::from( - "amadeus-testing/parquet/nested_maps.snappy.parquet", - ), None) + let rows = Parquet::<_, Value>::new( + PathBuf::from("amadeus-testing/parquet/nested_maps.snappy.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -709,9 +743,10 @@ async fn run(pool: &P) -> Duration { f: String, } - let rows = Parquet::<_, Nonnullable>::new(PathBuf::from( - "amadeus-testing/parquet/nonnullable.impala.parquet", - ), None) + let rows = Parquet::<_, Nonnullable>::new( + PathBuf::from("amadeus-testing/parquet/nonnullable.impala.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -722,9 +757,10 @@ async fn run(pool: &P) -> Duration { 1 ); - let rows = Parquet::<_, NonnullableDerived>::new(PathBuf::from( - "amadeus-testing/parquet/nonnullable.impala.parquet", - ), None) + let rows = Parquet::<_, NonnullableDerived>::new( + PathBuf::from("amadeus-testing/parquet/nonnullable.impala.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -735,9 +771,10 @@ async fn run(pool: &P) -> Duration { 1 ); - let rows = Parquet::<_, Value>::new(PathBuf::from( - "amadeus-testing/parquet/nonnullable.impala.parquet", - ), None) + let rows = Parquet::<_, Value>::new( + PathBuf::from("amadeus-testing/parquet/nonnullable.impala.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -782,9 +819,10 @@ async fn run(pool: &P) -> Duration { Option>>,)>,)>>>, )>, } - let rows = Parquet::<_, Nullable>::new(PathBuf::from( - "amadeus-testing/parquet/nullable.impala.parquet", - ), None) + let rows = Parquet::<_, Nullable>::new( + PathBuf::from("amadeus-testing/parquet/nullable.impala.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -795,9 +833,10 @@ async fn run(pool: &P) -> Duration { 7 ); - let rows = Parquet::<_, NullableDerived>::new(PathBuf::from( - "amadeus-testing/parquet/nullable.impala.parquet", - ), None) + let rows = Parquet::<_, NullableDerived>::new( + PathBuf::from("amadeus-testing/parquet/nullable.impala.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -808,9 +847,10 @@ async fn run(pool: &P) -> Duration { 7 ); - let rows = Parquet::<_, Value>::new(PathBuf::from( - "amadeus-testing/parquet/nullable.impala.parquet", - ), None) + let rows = Parquet::<_, Value>::new( + PathBuf::from("amadeus-testing/parquet/nullable.impala.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -831,9 +871,10 @@ async fn run(pool: &P) -> Duration { struct NullsDerived { b_struct: Option<(Option,)>, } - let rows = Parquet::<_, Nulls>::new(PathBuf::from( - "amadeus-testing/parquet/nulls.snappy.parquet", - ), None) + let rows = Parquet::<_, Nulls>::new( + PathBuf::from("amadeus-testing/parquet/nulls.snappy.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -844,9 +885,10 @@ async fn run(pool: &P) -> Duration { 8 ); - let rows = Parquet::<_, NullsDerived>::new(PathBuf::from( - "amadeus-testing/parquet/nulls.snappy.parquet", - ), None) + let rows = Parquet::<_, NullsDerived>::new( + PathBuf::from("amadeus-testing/parquet/nulls.snappy.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -857,9 +899,10 @@ async fn run(pool: &P) -> Duration { 8 ); - let rows = Parquet::<_, Value>::new(PathBuf::from( - "amadeus-testing/parquet/nulls.snappy.parquet", - ), None) + let rows = Parquet::<_, Value>::new( + PathBuf::from("amadeus-testing/parquet/nulls.snappy.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -882,9 +925,10 @@ async fn run(pool: &P) -> Duration { #[amadeus(name = "phoneNumbers")] phone_numbers: Option<(List<(i64, Option)>,)>, } - let rows = Parquet::<_, Repeated>::new(PathBuf::from( - "amadeus-testing/parquet/repeated_no_annotation.parquet", - ), None) + let rows = Parquet::<_, Repeated>::new( + PathBuf::from("amadeus-testing/parquet/repeated_no_annotation.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -895,9 +939,10 @@ async fn run(pool: &P) -> Duration { 6 ); - let rows = Parquet::<_, RepeatedDerived>::new(PathBuf::from( - "amadeus-testing/parquet/repeated_no_annotation.parquet", - ), None) + let rows = Parquet::<_, RepeatedDerived>::new( + PathBuf::from("amadeus-testing/parquet/repeated_no_annotation.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -908,9 +953,10 @@ async fn run(pool: &P) -> Duration { 6 ); - let rows = Parquet::<_, Value>::new(PathBuf::from( - "amadeus-testing/parquet/repeated_no_annotation.parquet", - ), None) + let rows = Parquet::<_, Value>::new( + PathBuf::from("amadeus-testing/parquet/repeated_no_annotation.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -935,9 +981,10 @@ async fn run(pool: &P) -> Duration { d: bool, e: Option>, } - let rows = Parquet::<_, TestDatapage>::new(PathBuf::from( - "amadeus-testing/parquet/datapage_v2.snappy.parquet", - ), None) + let rows = Parquet::<_, TestDatapage>::new( + PathBuf::from("amadeus-testing/parquet/datapage_v2.snappy.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -948,9 +995,10 @@ async fn run(pool: &P) -> Duration { 5 ); - let rows = Parquet::<_, TestDatapageDerived>::new(PathBuf::from( - "amadeus-testing/parquet/datapage_v2.snappy.parquet", - ), None) + let rows = Parquet::<_, TestDatapageDerived>::new( + PathBuf::from("amadeus-testing/parquet/datapage_v2.snappy.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -961,9 +1009,10 @@ async fn run(pool: &P) -> Duration { 5 ); - let rows = Parquet::<_, Value>::new(PathBuf::from( - "amadeus-testing/parquet/datapage_v2.snappy.parquet", - ), None) + let rows = Parquet::<_, Value>::new( + PathBuf::from("amadeus-testing/parquet/datapage_v2.snappy.parquet"), + None, + ) .await .unwrap(); assert_eq!( @@ -1006,10 +1055,12 @@ async fn run(pool: &P) -> Duration { __index_level_0__: Option, } - let rows = - Parquet::<_, CommitsDerived>::new(PathBuf::from("amadeus-testing/parquet/commits.parquet"), None) - .await - .unwrap(); + let rows = Parquet::<_, CommitsDerived>::new( + PathBuf::from("amadeus-testing/parquet/commits.parquet"), + None, + ) + .await + .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result<_, _>| row.unwrap())) @@ -1018,9 +1069,12 @@ async fn run(pool: &P) -> Duration { 14_444 ); - let rows = Parquet::<_, Value>::new(PathBuf::from("amadeus-testing/parquet/commits.parquet"), None) - .await - .unwrap(); + let rows = Parquet::<_, Value>::new( + PathBuf::from("amadeus-testing/parquet/commits.parquet"), + None, + ) + .await + .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result| -> Value { From bda7f89b48b94bf22e1fbefcf364b49c43bb1df4 Mon Sep 17 00:00:00 2001 From: Robin Bernon Date: Fri, 26 Mar 2021 23:12:08 +0000 Subject: [PATCH 03/11] Small fix --- tests/parquet.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/parquet.rs b/tests/parquet.rs index 4ec2fb49..13ebbf14 100644 --- a/tests/parquet.rs +++ b/tests/parquet.rs @@ -31,7 +31,7 @@ fn assert_columns_in_row( fn get_row_predicate(column_names: Vec) -> Option { Some(ValuePredicate::Group(Some(GroupPredicate::new( - column_names.into_iter().zip(None.into_iter()), + column_names.into_iter().map(|x| (x, None)), )))) } @@ -40,7 +40,7 @@ fn get_row_predicate(column_names: Vec) -> Option { async fn dynamic_reads() { let start = SystemTime::now(); - let pool = &ThreadPool::new(None).unwrap(); + let pool = &ThreadPool::new(None, None).unwrap(); let column_names = vec!["uri".to_string(), "location".to_string()]; From 22988c41c5329270273461fc02d8f2c91e1de9b7 Mon Sep 17 00:00:00 2001 From: Robin Bernon Date: Sat, 27 Mar 2021 11:12:23 +0000 Subject: [PATCH 04/11] Num cols check --- tests/parquet.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/parquet.rs b/tests/parquet.rs index 13ebbf14..ec985271 100644 --- a/tests/parquet.rs +++ b/tests/parquet.rs @@ -19,6 +19,8 @@ fn assert_columns_in_row( let row_group = row_unwrapped.clone().into_group().ok().unwrap(); let field_names = row_group.field_names().unwrap(); + assert_eq!(expected_column_names.len(), field_names.len()); + // Adds extra side check to make sure that order is also maintained. for (expected_column_name, (actual_column_name, _)) in expected_column_names.iter().zip(field_names.iter()) From edd002fc98cc8b0d52ff8f0a62dc01914a5edfd5 Mon Sep 17 00:00:00 2001 From: Robin Bernon Date: Tue, 30 Mar 2021 12:58:11 +0100 Subject: [PATCH 05/11] Adding helper functions to make it easier to work with data in dynamic form. --- README.md | 4 +- amadeus-core/src/lib.rs | 2 +- amadeus-parquet/src/lib.rs | 6 ++ src/helpers/filter_nulls_and_unwrap.rs | 91 ++++++++++++++++++++++++++ src/helpers/get_field_from_value.rs | 64 ++++++++++++++++++ src/helpers/mod.rs | 7 ++ src/lib.rs | 3 +- tests/parquet.rs | 10 +-- tests/parquet_wasm.rs | 2 +- 9 files changed, 175 insertions(+), 14 deletions(-) create mode 100644 src/helpers/filter_nulls_and_unwrap.rs create mode 100644 src/helpers/get_field_from_value.rs create mode 100644 src/helpers/mod.rs diff --git a/README.md b/README.md index 05f8eb02..5b0a2974 100644 --- a/README.md +++ b/README.md @@ -105,7 +105,7 @@ async fn main() -> Result<(), Box> { "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/", AwsCredentials::Anonymous, - ))) + )), None) .await?; let top_pages = rows @@ -140,7 +140,7 @@ async fn main() -> Result<(), Box> { "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/", AwsCredentials::Anonymous, - ))) + )), None) .await?; let top_pages = rows diff --git a/amadeus-core/src/lib.rs b/amadeus-core/src/lib.rs index ea26911b..8f3648bb 100644 --- a/amadeus-core/src/lib.rs +++ b/amadeus-core/src/lib.rs @@ -7,7 +7,7 @@ //! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. All functionality is re-exposed in [`amadeus`](https://docs.rs/amadeus/0.3/amadeus/). #![doc(html_root_url = "https://docs.rs/amadeus-core/0.4.2")] -#![cfg_attr(nightly, feature(unboxed_closures))] +#![cfg_attr(nightly, feature(unboxed_closures, fn_traits))] #![recursion_limit = "25600"] #![warn( // missing_copy_implementations, diff --git a/amadeus-parquet/src/lib.rs b/amadeus-parquet/src/lib.rs index 40f8259f..fbbaf8c2 100644 --- a/amadeus-parquet/src/lib.rs +++ b/amadeus-parquet/src/lib.rs @@ -73,6 +73,12 @@ mod wrap { }; } + pub fn get_row_predicate(column_names: Vec) -> Option { + Some(ValuePredicate::Group(Some(GroupPredicate::new( + column_names.into_iter().map(|x| (x, None)), + )))) + } + #[derive(Educe)] #[educe(Clone, Debug)] pub struct Parquet diff --git a/src/helpers/filter_nulls_and_unwrap.rs b/src/helpers/filter_nulls_and_unwrap.rs new file mode 100644 index 00000000..440a7a78 --- /dev/null +++ b/src/helpers/filter_nulls_and_unwrap.rs @@ -0,0 +1,91 @@ +use crate::par_stream::{Filter, Map}; +use amadeus_core::par_stream::ParallelStream; + +pub trait FilterNullsAndUnwrap +where + O: Send + 'static, +{ + fn filter_nulls_and_unwrap(self) + -> Map, fn(Option) -> O>; +} + +impl FilterNullsAndUnwrap for T +where + T: ParallelStream>, + O: Send + 'static, +{ + fn filter_nulls_and_unwrap( + self, + ) -> Map, fn(Option) -> O> { + self.filter(OptionFilterNullHandler {}) + .map(|unwrapped_value: Option| unwrapped_value.unwrap()) + } +} + +pub trait FilterNullsAndDoubleUnwrap +where + O: Send + 'static, +{ + fn filter_nulls_and_double_unwrap( + self, + ) -> Map, fn(Option>) -> O>; +} + +impl FilterNullsAndDoubleUnwrap for T +where + T: ParallelStream>>, + O: Send + 'static, +{ + fn filter_nulls_and_double_unwrap( + self, + ) -> Map, fn(Option>) -> O> { + self.filter(DoubleOptionFilterNullHandler {}) + .map(|unwrapped_value: Option>| unwrapped_value.unwrap().unwrap()) + } +} + +#[derive(Clone)] +pub struct DoubleOptionFilterNullHandler {} + +impl FnOnce<(&Option>,)> for DoubleOptionFilterNullHandler +where + T: Send + 'static, +{ + type Output = bool; + + extern "rust-call" fn call_once(mut self, args: (&Option>,)) -> Self::Output { + self.call_mut(args) + } +} + +impl FnMut<(&Option>,)> for DoubleOptionFilterNullHandler +where + T: Send + 'static, +{ + extern "rust-call" fn call_mut(&mut self, args: (&Option>,)) -> Self::Output { + args.0.is_some() && args.0.as_ref().unwrap().is_some() + } +} + +#[derive(Clone)] +pub struct OptionFilterNullHandler {} + +impl FnOnce<(&Option,)> for OptionFilterNullHandler +where + T: Send + 'static, +{ + type Output = bool; + + extern "rust-call" fn call_once(mut self, args: (&Option,)) -> Self::Output { + self.call_mut(args) + } +} + +impl FnMut<(&Option,)> for OptionFilterNullHandler +where + T: Send + 'static, +{ + extern "rust-call" fn call_mut(&mut self, args: (&Option,)) -> Self::Output { + args.0.is_some() + } +} diff --git a/src/helpers/get_field_from_value.rs b/src/helpers/get_field_from_value.rs new file mode 100644 index 00000000..c617297a --- /dev/null +++ b/src/helpers/get_field_from_value.rs @@ -0,0 +1,64 @@ +use crate::par_stream::{Map, ParallelStream}; +use amadeus_types::{DowncastFrom, Value}; +use std::marker::PhantomData; + +pub trait GetFieldFromValue { + fn get_field_from_value(self, field_name: String) -> Map> + where + O: DowncastFrom + Clone + Send + 'static; +} + +impl GetFieldFromValue for T +where + T: ParallelStream>, + E: Send + 'static, +{ + fn get_field_from_value(self, field_name: String) -> Map> + where + O: DowncastFrom + Clone + Send + 'static, + { + self.map(UnwrapFieldHandler:: { + field_name, + marker: PhantomData, + }) + } +} + +#[derive(Clone)] +pub struct UnwrapFieldHandler +where + T: DowncastFrom + Clone + Send + 'static, +{ + field_name: String, + marker: PhantomData, +} + +impl FnOnce<(Result,)> for UnwrapFieldHandler +where + T: DowncastFrom + Clone + Send + 'static, +{ + type Output = Option; + + extern "rust-call" fn call_once(mut self, args: (Result,)) -> Self::Output { + self.call_mut(args) + } +} + +impl FnMut<(Result,)> for UnwrapFieldHandler +where + T: DowncastFrom + Clone + Send + 'static, +{ + extern "rust-call" fn call_mut(&mut self, args: (Result,)) -> Self::Output { + let field = args + .0 + .ok() + .unwrap() + .into_group() + .ok() + .unwrap() + .get(self.field_name.as_ref()) + .unwrap() + .clone(); + T::downcast_from(field).ok() + } +} diff --git a/src/helpers/mod.rs b/src/helpers/mod.rs new file mode 100644 index 00000000..68aafbf5 --- /dev/null +++ b/src/helpers/mod.rs @@ -0,0 +1,7 @@ +mod filter_nulls_and_unwrap; +mod get_field_from_value; + +pub use filter_nulls_and_unwrap::{ + DoubleOptionFilterNullHandler, FilterNullsAndDoubleUnwrap, FilterNullsAndUnwrap, OptionFilterNullHandler +}; +pub use get_field_from_value::{GetFieldFromValue, UnwrapFieldHandler}; diff --git a/src/lib.rs b/src/lib.rs index 498a34b2..c7dbafa6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,7 +8,7 @@ #![doc( html_logo_url = "https://raw.githubusercontent.com/constellation-rs/amadeus/master/logo.svg?sanitize=true" )] -#![cfg_attr(nightly, feature(unboxed_closures))] +#![cfg_attr(nightly, feature(unboxed_closures, fn_traits))] #![warn( // missing_copy_implementations, // missing_debug_implementations, @@ -42,6 +42,7 @@ compile_error!("The Amadeus Parquet connector currently requires nightly"); doc_comment::doctest!("../README.md"); pub mod data; +pub mod helpers; pub mod pool; pub mod source; diff --git a/tests/parquet.rs b/tests/parquet.rs index ec985271..2925ede4 100644 --- a/tests/parquet.rs +++ b/tests/parquet.rs @@ -8,9 +8,7 @@ use std::{collections::HashMap, path::PathBuf, time::SystemTime}; use amadeus::prelude::*; - -use amadeus::amadeus_parquet::GroupPredicate; -use amadeus_parquet::ValuePredicate; +use amadeus_parquet::get_row_predicate; fn assert_columns_in_row( expected_column_names: &Vec, row: Result, @@ -31,12 +29,6 @@ fn assert_columns_in_row( Ok(row_unwrapped) } -fn get_row_predicate(column_names: Vec) -> Option { - Some(ValuePredicate::Group(Some(GroupPredicate::new( - column_names.into_iter().map(|x| (x, None)), - )))) -} - #[tokio::test(threaded_scheduler)] #[cfg_attr(miri, ignore)] async fn dynamic_reads() { diff --git a/tests/parquet_wasm.rs b/tests/parquet_wasm.rs index 2073459d..900941e0 100644 --- a/tests/parquet_wasm.rs +++ b/tests/parquet_wasm.rs @@ -55,7 +55,7 @@ async fn parquet() { PathBuf::from("amadeus-testing/parquet/cf-accesslogs/year=2018/month=11/day=05/part-00025-96c249f4-3a10-4509-b6b8-693a5d90dbf3.c000.snappy.parquet"), PathBuf::from("amadeus-testing/parquet/cf-accesslogs/year=2018/month=11/day=06/part-00185-96c249f4-3a10-4509-b6b8-693a5d90dbf3.c000.snappy.parquet"), PathBuf::from("amadeus-testing/parquet/cf-accesslogs/year=2018/month=11/day=07/part-00151-96c249f4-3a10-4509-b6b8-693a5d90dbf3.c000.snappy.parquet"), - ]).await.unwrap(); + ], None).await.unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) From 75c4a17e008e788de9a40790312e6010c5816714 Mon Sep 17 00:00:00 2001 From: Robin Bernon Date: Tue, 30 Mar 2021 14:18:06 +0100 Subject: [PATCH 06/11] Updating dynamic example on the readme. --- README.md | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 5b0a2974..f945765d 100644 --- a/README.md +++ b/README.md @@ -130,27 +130,27 @@ This is typed, so faster, and it goes an analytics step further also, prints top ```rust use amadeus::prelude::*; use std::error::Error; +use amadeus::helpers::{FilterNullsAndDoubleUnwrap, GetFieldFromValue}; +use amadeus::amadeus_parquet::get_row_predicate; #[tokio::main] async fn main() -> Result<(), Box> { let pool = ThreadPool::new(None, None)?; + let column_name = "uri".to_string(); + let rows = Parquet::new(ParquetDirectory::new(S3Directory::new_with( AwsRegion::UsEast1, "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/", AwsCredentials::Anonymous, - )), None) + )), get_row_predicate(vec![column_name.clone()])) .await?; let top_pages = rows .par_stream() - .map(|row: Result| { - let row = row.ok()?.into_group().ok()?; - row.get("uri")?.clone().into_url().ok() - }) - .filter(|row| row.is_some()) - .map(Option::unwrap) + .get_field_from_value::>(column_name.clone()) + .filter_nulls_and_double_unwrap() .most_frequent(&pool, 100, 0.99, 0.002) .await; From b5a3ab0cad0adbc1dd6cc2ab9cda74236446f235 Mon Sep 17 00:00:00 2001 From: Robin Bernon Date: Tue, 30 Mar 2021 17:10:49 +0100 Subject: [PATCH 07/11] Sorting out FxBuildHasher issue --- amadeus-parquet/Cargo.toml | 2 +- amadeus-parquet/src/internal/record/impls.rs | 2 +- amadeus-parquet/src/internal/record/predicates.rs | 5 +++-- amadeus-parquet/src/internal/record/reader.rs | 4 ++-- amadeus-parquet/src/internal/record/schemas.rs | 2 +- amadeus-serde/Cargo.toml | 2 +- amadeus-serde/src/impls.rs | 2 +- amadeus-types/Cargo.toml | 2 +- amadeus-types/src/group.rs | 2 +- amadeus-types/src/value.rs | 2 +- src/helpers/mod.rs | 6 ++---- 11 files changed, 15 insertions(+), 16 deletions(-) diff --git a/amadeus-parquet/Cargo.toml b/amadeus-parquet/Cargo.toml index 14afc1a9..c0cd9c72 100644 --- a/amadeus-parquet/Cargo.toml +++ b/amadeus-parquet/Cargo.toml @@ -29,7 +29,7 @@ educe = "0.4" flate2 = { version = "1.0.2", features = ["rust_backend"], default-features = false } futures = "0.3" fxhash = "0.2" -hashlink = { version = "0.6", features = ["serde_impl"] } +hashlink = { version = "0.6.1-alpha.0", features = ["serde_impl"], git = "https://github.com/robinbernon/hashlink", branch = "generic_hashmap_deserialization" } lz-fear = "0.1" num-bigint = "0.3" quick-error = "1.2.2" diff --git a/amadeus-parquet/src/internal/record/impls.rs b/amadeus-parquet/src/internal/record/impls.rs index 4604196b..6b5e2e89 100644 --- a/amadeus-parquet/src/internal/record/impls.rs +++ b/amadeus-parquet/src/internal/record/impls.rs @@ -1,4 +1,4 @@ -use hashlink::linked_hash_map::LinkedHashMap; +use hashlink::LinkedHashMap; use std::{ any::type_name, collections::HashMap, convert::{TryFrom, TryInto}, fmt, hash::{BuildHasher, Hash}, marker::PhantomData, string::FromUtf8Error, sync::Arc }; diff --git a/amadeus-parquet/src/internal/record/predicates.rs b/amadeus-parquet/src/internal/record/predicates.rs index f7852e06..d231f806 100644 --- a/amadeus-parquet/src/internal/record/predicates.rs +++ b/amadeus-parquet/src/internal/record/predicates.rs @@ -1,10 +1,11 @@ -use hashlink::linked_hash_map::LinkedHashMap; +use hashlink::LinkedHashMap; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use amadeus_types::{Bson, Date, DateTime, Decimal, Enum, Group, Json, List, Time, Value}; use crate::internal::record::ParquetData; +use fxhash::FxBuildHasher; #[derive(Clone, Debug, Serialize, Deserialize)] /// Predicate for [`Group`]s @@ -22,7 +23,7 @@ impl MapPredicate { /// Predicate for [`Group`]s pub struct GroupPredicate( /// Map of field names to predicates for the fields in the group - pub(super) LinkedHashMap::Predicate>>, + pub(super) LinkedHashMap::Predicate>, FxBuildHasher>, ); impl GroupPredicate { pub fn new(fields: I) -> Self diff --git a/amadeus-parquet/src/internal/record/reader.rs b/amadeus-parquet/src/internal/record/reader.rs index 6118ee22..da07663c 100644 --- a/amadeus-parquet/src/internal/record/reader.rs +++ b/amadeus-parquet/src/internal/record/reader.rs @@ -24,7 +24,7 @@ //! that are optional or repeated. use fxhash::FxBuildHasher; -use hashlink::linked_hash_map::LinkedHashMap; +use hashlink::LinkedHashMap; use std::{ collections::HashMap, convert::TryInto, error::Error, marker::PhantomData, mem, sync::Arc }; @@ -948,7 +948,7 @@ where mod tests { use super::*; - use hashlink::linked_hash_map::LinkedHashMap; + use hashlink::LinkedHashMap; use std::{collections::HashMap, sync::Arc}; use crate::internal::{ diff --git a/amadeus-parquet/src/internal/record/schemas.rs b/amadeus-parquet/src/internal/record/schemas.rs index 0190a75e..beef783b 100644 --- a/amadeus-parquet/src/internal/record/schemas.rs +++ b/amadeus-parquet/src/internal/record/schemas.rs @@ -31,7 +31,7 @@ //! ``` use fxhash::FxBuildHasher; -use hashlink::linked_hash_map::LinkedHashMap; +use hashlink::LinkedHashMap; use std::{ fmt::{self, Debug, Display}, marker::PhantomData, mem, str::FromStr }; diff --git a/amadeus-serde/Cargo.toml b/amadeus-serde/Cargo.toml index 4563bccb..d10f394a 100644 --- a/amadeus-serde/Cargo.toml +++ b/amadeus-serde/Cargo.toml @@ -25,7 +25,7 @@ chrono = { version = "0.4", default-features = false, features = ["serde"] } csv = "1.0" educe = "0.4" futures = "0.3" -hashlink = "0.6" +hashlink = { version = "0.6.1-alpha.0", git = "https://github.com/robinbernon/hashlink", branch = "generic_hashmap_deserialization" } serde = { version = "1.0", features = ["derive"] } serde_bytes = "0.11" serde_closure = "0.3" diff --git a/amadeus-serde/src/impls.rs b/amadeus-serde/src/impls.rs index 8e434ca8..07e6e7bf 100644 --- a/amadeus-serde/src/impls.rs +++ b/amadeus-serde/src/impls.rs @@ -1,6 +1,6 @@ #![allow(clippy::too_many_lines)] -use hashlink::linked_hash_map::LinkedHashMap; +use hashlink::LinkedHashMap; use recycle::VecExt; use serde::{ de::{self, MapAccess, SeqAccess, Visitor}, ser::{SerializeSeq, SerializeStruct, SerializeTupleStruct}, Deserializer, Serializer diff --git a/amadeus-types/Cargo.toml b/amadeus-types/Cargo.toml index 301d9b14..f97dc7ff 100644 --- a/amadeus-types/Cargo.toml +++ b/amadeus-types/Cargo.toml @@ -23,7 +23,7 @@ amadeus-core = { version = "=0.4.2", path = "../amadeus-core" } chrono = { version = "0.4", default-features = false, features = ["std", "serde"] } chrono-tz = { version = "0.5", features = ["serde"] } fxhash = "0.2" -hashlink = "0.6" +hashlink = { version = "0.6.1-alpha.0", git = "https://github.com/robinbernon/hashlink", branch = "generic_hashmap_deserialization" } once_cell = "1.0" ordered-float = "2.0" serde = { version = "1.0", features = ["derive"] } diff --git a/amadeus-types/src/group.rs b/amadeus-types/src/group.rs index c831a985..626316dd 100644 --- a/amadeus-types/src/group.rs +++ b/amadeus-types/src/group.rs @@ -1,7 +1,7 @@ //! Implement [`Record`] for [`Group`] aka [`Row`]. use fxhash::FxBuildHasher; -use hashlink::linked_hash_map::LinkedHashMap; +use hashlink::LinkedHashMap; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::{ cmp::Ordering, fmt::{self, Debug}, ops::Index, slice::SliceIndex, str, sync::Arc diff --git a/amadeus-types/src/value.rs b/amadeus-types/src/value.rs index cd8d4077..48784ae7 100644 --- a/amadeus-types/src/value.rs +++ b/amadeus-types/src/value.rs @@ -3,7 +3,7 @@ #![allow(clippy::type_complexity)] use fxhash::FxBuildHasher; -use hashlink::linked_hash_map::LinkedHashMap; +use hashlink::LinkedHashMap; use recycle::VecExt; use serde::{de::Deserializer, ser::Serializer, Deserialize, Serialize}; use std::{ diff --git a/src/helpers/mod.rs b/src/helpers/mod.rs index 68aafbf5..f7532e76 100644 --- a/src/helpers/mod.rs +++ b/src/helpers/mod.rs @@ -1,7 +1,5 @@ mod filter_nulls_and_unwrap; mod get_field_from_value; -pub use filter_nulls_and_unwrap::{ - DoubleOptionFilterNullHandler, FilterNullsAndDoubleUnwrap, FilterNullsAndUnwrap, OptionFilterNullHandler -}; -pub use get_field_from_value::{GetFieldFromValue, UnwrapFieldHandler}; +pub use filter_nulls_and_unwrap::{FilterNullsAndDoubleUnwrap, FilterNullsAndUnwrap}; +pub use get_field_from_value::GetFieldFromValue; From 19724895ff245ec514af99a8f856c3c437ce4e43 Mon Sep 17 00:00:00 2001 From: alecmocatta Date: Mon, 5 Apr 2021 12:09:14 +0100 Subject: [PATCH 08/11] clippy fixes --- src/helpers/filter_nulls_and_unwrap.rs | 5 +++-- src/pool/util.rs | 6 +++++- tests/parquet.rs | 26 +++++++++++--------------- 3 files changed, 19 insertions(+), 18 deletions(-) diff --git a/src/helpers/filter_nulls_and_unwrap.rs b/src/helpers/filter_nulls_and_unwrap.rs index 440a7a78..5435d8d2 100644 --- a/src/helpers/filter_nulls_and_unwrap.rs +++ b/src/helpers/filter_nulls_and_unwrap.rs @@ -1,3 +1,5 @@ +#![allow(clippy::type_complexity, clippy::option_option)] + use crate::par_stream::{Filter, Map}; use amadeus_core::par_stream::ParallelStream; @@ -17,8 +19,7 @@ where fn filter_nulls_and_unwrap( self, ) -> Map, fn(Option) -> O> { - self.filter(OptionFilterNullHandler {}) - .map(|unwrapped_value: Option| unwrapped_value.unwrap()) + self.filter(OptionFilterNullHandler {}).map(Option::unwrap) } } diff --git a/src/pool/util.rs b/src/pool/util.rs index af61c67d..7db0e9c0 100644 --- a/src/pool/util.rs +++ b/src/pool/util.rs @@ -89,7 +89,11 @@ impl Synchronize { F: Future, { let nonce = self.nonce.load(Ordering::SeqCst); - if !self.running.compare_and_swap(false, true, Ordering::SeqCst) { + if self + .running + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + .is_ok() + { let on_drop = OnDrop::new(|| self.running.store(false, Ordering::SeqCst)); f.await; on_drop.cancel(); diff --git a/tests/parquet.rs b/tests/parquet.rs index 2925ede4..acedfe75 100644 --- a/tests/parquet.rs +++ b/tests/parquet.rs @@ -8,10 +8,9 @@ use std::{collections::HashMap, path::PathBuf, time::SystemTime}; use amadeus::prelude::*; -use amadeus_parquet::get_row_predicate; fn assert_columns_in_row( - expected_column_names: &Vec, row: Result, + expected_column_names: &[String], row: Result, ) -> Result { let row_unwrapped = row.ok().unwrap(); let row_group = row_unwrapped.clone().into_group().ok().unwrap(); @@ -38,20 +37,17 @@ async fn dynamic_reads() { let column_names = vec!["uri".to_string(), "location".to_string()]; - #[cfg(feature = "aws")] - { - let rows = Parquet::<_, Value>::new(vec![ - S3File::new_with(AwsRegion::UsEast1, "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/year=2018/month=11/day=02/part-00176-17868f39-cd99-4b60-bb48-8daf9072122e.c000.snappy.parquet", AwsCredentials::Anonymous), - ], get_row_predicate(column_names.clone())).await.unwrap(); + let rows = Parquet::<_, Value>::new(vec![ + PathBuf::from("amadeus-testing/parquet/cf-accesslogs/year=2018/month=11/day=02/part-00176-17868f39-cd99-4b60-bb48-8daf9072122e.c000.snappy.parquet"), + ], amadeus_parquet::get_row_predicate(column_names.clone())).await.unwrap(); - assert_eq!( - rows.par_stream() - .map(move |row: Result| assert_columns_in_row(&column_names, row)) - .count(pool) - .await, - 4172 - ); - } + assert_eq!( + rows.par_stream() + .map(move |row: Result| assert_columns_in_row(&column_names, row)) + .count(pool) + .await, + 4172 + ); println!("in {:?}", start.elapsed().unwrap()); } From 7b4e35fb7f5cd280a642c52c7db6c8b14e2cd669 Mon Sep 17 00:00:00 2001 From: alecmocatta Date: Mon, 5 Apr 2021 13:05:48 +0100 Subject: [PATCH 09/11] bump clippy --- amadeus-aws/src/lib.rs | 6 ++++-- amadeus-commoncrawl/src/lib.rs | 6 ++++-- amadeus-commoncrawl/src/parser.rs | 5 +---- amadeus-core/src/file.rs | 7 +++---- amadeus-core/src/lib.rs | 5 ++++- amadeus-core/src/par_stream.rs | 8 ++++---- amadeus-core/src/pipe.rs | 2 +- amadeus-core/src/pipe/flat_map.rs | 6 ++---- amadeus-core/src/pipe/flat_map_sync.rs | 6 ++---- amadeus-core/src/pipe/flatten.rs | 6 ++---- amadeus-parquet/src/lib.rs | 2 +- amadeus-postgres/src/lib.rs | 7 ++++--- amadeus-serde/src/lib.rs | 5 +++-- amadeus-streaming/src/distinct.rs | 2 +- amadeus-streaming/src/lib.rs | 4 +++- amadeus-streaming/src/sort.rs | 6 +++--- amadeus-types/src/lib.rs | 3 ++- amadeus-types/src/util.rs | 16 ++++++++-------- azure-pipelines.yml | 8 ++++---- src/lib.rs | 3 ++- src/pool/thread.rs | 2 +- 21 files changed, 59 insertions(+), 56 deletions(-) diff --git a/amadeus-aws/src/lib.rs b/amadeus-aws/src/lib.rs index a3df38ac..20ca9bbb 100644 --- a/amadeus-aws/src/lib.rs +++ b/amadeus-aws/src/lib.rs @@ -7,7 +7,7 @@ //! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. These types are re-exposed in [`amadeus::source`](https://docs.rs/amadeus/0.3/amadeus/source/index.html). #![doc(html_root_url = "https://docs.rs/amadeus-aws/0.4.2")] -#![cfg_attr(nightly, feature(type_alias_impl_trait))] +#![cfg_attr(nightly, feature(min_type_alias_impl_trait))] #![warn( // missing_copy_implementations, // missing_debug_implementations, @@ -26,7 +26,9 @@ clippy::must_use_candidate, clippy::type_repetition_in_bounds, clippy::filter_map, - clippy::missing_errors_doc + clippy::missing_errors_doc, + clippy::missing_panics_doc, + clippy::let_underscore_drop )] #![deny(unsafe_code)] diff --git a/amadeus-commoncrawl/src/lib.rs b/amadeus-commoncrawl/src/lib.rs index 45881478..02f1fd67 100644 --- a/amadeus-commoncrawl/src/lib.rs +++ b/amadeus-commoncrawl/src/lib.rs @@ -7,7 +7,7 @@ //! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. These types are re-exposed in [`amadeus::source`](https://docs.rs/amadeus/0.3/amadeus/source/index.html). #![doc(html_root_url = "https://docs.rs/amadeus-commoncrawl/0.4.2")] -#![cfg_attr(nightly, feature(type_alias_impl_trait))] +#![cfg_attr(nightly, feature(min_type_alias_impl_trait))] #![warn( // missing_copy_implementations, // missing_debug_implementations, @@ -22,7 +22,9 @@ #![allow( clippy::doc_markdown, clippy::inline_always, - clippy::missing_errors_doc + clippy::missing_errors_doc, + clippy::missing_panics_doc, + clippy::let_underscore_drop )] #![deny(unsafe_code)] diff --git a/amadeus-commoncrawl/src/parser.rs b/amadeus-commoncrawl/src/parser.rs index 82ec7919..f546f9af 100644 --- a/amadeus-commoncrawl/src/parser.rs +++ b/amadeus-commoncrawl/src/parser.rs @@ -53,10 +53,7 @@ impl<'a> fmt::Debug for Record<'a> { // write!(form, "\n").unwrap(); // } writeln!(form, "Content Length:{}", self.content.len()).unwrap(); - let s = match str::from_utf8(self.content) { - Ok(s) => s, - Err(_) => "Could not convert", - }; + let s = str::from_utf8(self.content).unwrap_or("Could not convert"); writeln!(form, "Content :{:?}", s).unwrap(); writeln!(form) } diff --git a/amadeus-core/src/file.rs b/amadeus-core/src/file.rs index 92df376f..f15059ff 100644 --- a/amadeus-core/src/file.rs +++ b/amadeus-core/src/file.rs @@ -31,7 +31,7 @@ impl OsString { pub fn to_string_lossy(&self) -> String { self.buf.to_string_lossy() } - pub fn display<'a>(&'a self) -> impl fmt::Display + 'a { + pub fn display(&self) -> impl fmt::Display + '_ { struct Display<'a>(&'a OsString); impl<'a> fmt::Display for Display<'a> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -160,15 +160,14 @@ impl PathBuf { pub fn iter<'a>(&'a self) -> impl Iterator + 'a { self.components.iter() } - pub fn display<'a>(&'a self) -> impl fmt::Display + 'a { + pub fn display(&self) -> impl fmt::Display + '_ { struct Display<'a>(&'a PathBuf); impl<'a> fmt::Display for Display<'a> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let mut res: fmt::Result = self .0 .iter() - .map(|component| write!(f, "{}/", component.to_string_lossy())) - .collect(); + .try_for_each(|component| write!(f, "{}/", component.to_string_lossy())); if let Some(file_name) = self.0.file_name() { res = res.and_then(|()| write!(f, "{}", file_name.to_string_lossy())); } diff --git a/amadeus-core/src/lib.rs b/amadeus-core/src/lib.rs index 8f3648bb..27047f3c 100644 --- a/amadeus-core/src/lib.rs +++ b/amadeus-core/src/lib.rs @@ -33,7 +33,10 @@ clippy::default_trait_access, clippy::filter_map, clippy::wildcard_imports, - clippy::needless_pass_by_value + clippy::needless_pass_by_value, + clippy::unnecessary_wraps, + clippy::missing_panics_doc, + clippy::let_underscore_drop )] #![deny(unsafe_code)] diff --git a/amadeus-core/src/par_stream.rs b/amadeus-core/src/par_stream.rs index 495399be..297ad413 100644 --- a/amadeus-core/src/par_stream.rs +++ b/amadeus-core/src/par_stream.rs @@ -23,7 +23,7 @@ use futures::{future, pin_mut, stream::StreamExt as _, Stream}; use indexmap::IndexMap; use serde_closure::{traits, FnOnce}; use std::{ - cmp::Ordering, hash::Hash, iter, ops, pin::Pin, task::{Context, Poll}, vec + cmp::Ordering, hash::Hash, iter, ops, pin::Pin, task::{Context, Poll} }; use super::{par_pipe::*, par_sink::*}; @@ -434,7 +434,7 @@ stream!(ParallelStream ParallelPipe ParallelSink FromParallelStream IntoParallel let self_ = self; pin_mut!(self_); // TODO: don't buffer tasks before sending. requires changes to ThreadPool - let mut tasks = (0..pool.threads()).map(|_| vec![]).collect::>(); + let mut tasks = (0..pool.threads()).map(|_| Vec::new()).collect::>(); let mut allocated = 0; 'a: loop { for i in 0..tasks.len() { @@ -597,7 +597,7 @@ stream!(DistributedStream DistributedPipe DistributedSink FromDistributedStream let self_ = self; pin_mut!(self_); // TODO: don't buffer tasks before sending. requires changes to ProcessPool - let mut tasks = (0..pool.processes()).map(|_| vec![]).collect::>(); + let mut tasks = (0..pool.processes()).map(|_| Vec::new()).collect::>(); let mut allocated = 0; 'a: loop { for i in 0..tasks.len() { @@ -647,7 +647,7 @@ stream!(DistributedStream DistributedPipe DistributedSink FromDistributedStream pool.spawn(FnOnce!(move |pool: &P::ThreadPool| { let mut process_tasks = tasks.into_iter(); - let mut tasks = (0..pool.threads()).map(|_| vec![]).collect::>(); + let mut tasks = (0..pool.threads()).map(|_| Vec::new()).collect::>(); let mut allocated = 0; 'a: loop { for i in 0..tasks.len() { diff --git a/amadeus-core/src/pipe.rs b/amadeus-core/src/pipe.rs index 714c6ca1..1fb20acf 100644 --- a/amadeus-core/src/pipe.rs +++ b/amadeus-core/src/pipe.rs @@ -306,7 +306,7 @@ impl + Unpin, St: ?Sized + Stream + Unpin, I let items = &mut **self_.items; let mut given_all = false; let stream = stream::poll_fn(|cx| match Pin::new(&mut *items).poll_next(cx) { - x @ Poll::Ready(Some(_)) | x @ Poll::Pending => x, + x @ (Poll::Ready(Some(_)) | Poll::Pending) => x, Poll::Ready(None) => { given_all = true; Poll::Pending diff --git a/amadeus-core/src/pipe/flat_map.rs b/amadeus-core/src/pipe/flat_map.rs index 917789b6..21f0b1ab 100644 --- a/amadeus-core/src/pipe/flat_map.rs +++ b/amadeus-core/src/pipe/flat_map.rs @@ -33,9 +33,8 @@ where if let Some(s) = self_.next.as_mut().as_pin_mut() { if let Some(item) = ready!(s.poll_next(cx)) { break Some(item); - } else { - self_.next.set(None); } + self_.next.set(None); } else if let Some(s) = ready!(self_.pipe.as_mut().poll_next(cx)) { self_.next.set(Some(self_.f.call_mut((s,)))); } else { @@ -61,9 +60,8 @@ where if let Some(s) = self_.next.as_mut().as_pin_mut() { if let Some(item) = ready!(s.poll_next(cx)) { break Some(item); - } else { - self_.next.set(None); } + self_.next.set(None); } else if let Some(s) = ready!(self_.pipe.as_mut().poll_next(cx, stream.as_mut())) { self_.next.set(Some(self_.f.call_mut((s,)))); } else { diff --git a/amadeus-core/src/pipe/flat_map_sync.rs b/amadeus-core/src/pipe/flat_map_sync.rs index cb5ab406..c6cf0b1e 100644 --- a/amadeus-core/src/pipe/flat_map_sync.rs +++ b/amadeus-core/src/pipe/flat_map_sync.rs @@ -32,9 +32,8 @@ where if let Some(s) = self_.next.as_mut() { if let Some(item) = s.next() { break Some(item); - } else { - *self_.next = None; } + *self_.next = None; } else if let Some(s) = ready!(self_.pipe.as_mut().poll_next(cx)) { *self_.next = Some(self_.f.call_mut((s,))); } else { @@ -60,9 +59,8 @@ where if let Some(s) = self_.next.as_mut() { if let Some(item) = s.next() { break Some(item); - } else { - *self_.next = None; } + *self_.next = None; } else if let Some(s) = ready!(self_.pipe.as_mut().poll_next(cx, stream.as_mut())) { *self_.next = Some(self_.f.call_mut((s,))); } else { diff --git a/amadeus-core/src/pipe/flatten.rs b/amadeus-core/src/pipe/flatten.rs index f8f9e325..50b8ee13 100644 --- a/amadeus-core/src/pipe/flatten.rs +++ b/amadeus-core/src/pipe/flatten.rs @@ -30,9 +30,8 @@ where if let Some(s) = self_.next.as_mut().as_pin_mut() { if let Some(item) = ready!(s.poll_next(cx)) { break Some(item); - } else { - self_.next.set(None); } + self_.next.set(None); } else if let Some(s) = ready!(self_.pipe.as_mut().poll_next(cx)) { self_.next.set(Some(s)); } else { @@ -57,9 +56,8 @@ where if let Some(s) = self_.next.as_mut().as_pin_mut() { if let Some(item) = ready!(s.poll_next(cx)) { break Some(item); - } else { - self_.next.set(None); } + self_.next.set(None); } else if let Some(s) = ready!(self_.pipe.as_mut().poll_next(cx, stream.as_mut())) { self_.next.set(Some(s)); } else { diff --git a/amadeus-parquet/src/lib.rs b/amadeus-parquet/src/lib.rs index fbbaf8c2..7587e6d1 100644 --- a/amadeus-parquet/src/lib.rs +++ b/amadeus-parquet/src/lib.rs @@ -10,7 +10,7 @@ #![cfg_attr(nightly, feature(bufreader_seek_relative))] #![cfg_attr(nightly, feature(read_initializer))] #![cfg_attr(nightly, feature(specialization))] -#![cfg_attr(nightly, feature(type_alias_impl_trait))] +#![cfg_attr(nightly, feature(min_type_alias_impl_trait))] #![cfg_attr(nightly, feature(test))] #![warn( // missing_copy_implementations, diff --git a/amadeus-postgres/src/lib.rs b/amadeus-postgres/src/lib.rs index f0a3b867..5e913607 100644 --- a/amadeus-postgres/src/lib.rs +++ b/amadeus-postgres/src/lib.rs @@ -7,7 +7,7 @@ //! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. These types are re-exposed in [`amadeus::source`](https://docs.rs/amadeus/0.3/amadeus/source/index.html). #![doc(html_root_url = "https://docs.rs/amadeus-postgres/0.4.2")] -#![cfg_attr(nightly, feature(type_alias_impl_trait))] +#![cfg_attr(nightly, feature(min_type_alias_impl_trait))] #![warn( // missing_copy_implementations, // missing_debug_implementations, @@ -24,7 +24,8 @@ clippy::similar_names, clippy::if_not_else, clippy::must_use_candidate, - clippy::missing_errors_doc + clippy::missing_errors_doc, + clippy::let_underscore_drop )] #![deny(unsafe_code)] @@ -415,7 +416,7 @@ where } let (head, tail) = buf.split_at(len); *buf = tail; - Some(&head[..]) + Some(head) }; T::decode(type_, value) } diff --git a/amadeus-serde/src/lib.rs b/amadeus-serde/src/lib.rs index eb8ea40f..2375512a 100644 --- a/amadeus-serde/src/lib.rs +++ b/amadeus-serde/src/lib.rs @@ -7,7 +7,7 @@ //! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. These types are re-exposed in [`amadeus::source`](https://docs.rs/amadeus/0.3/amadeus/source/index.html). #![doc(html_root_url = "https://docs.rs/amadeus-serde/0.4.2")] -#![cfg_attr(nightly, feature(type_alias_impl_trait))] +#![cfg_attr(nightly, feature(min_type_alias_impl_trait))] #![warn( // missing_copy_implementations, // missing_debug_implementations, @@ -26,7 +26,8 @@ clippy::must_use_candidate, clippy::missing_errors_doc, clippy::needless_pass_by_value, - clippy::default_trait_access + clippy::default_trait_access, + clippy::needless_question_mark )] #![deny(unsafe_code)] diff --git a/amadeus-streaming/src/distinct.rs b/amadeus-streaming/src/distinct.rs index 7d99c273..c08f0dda 100644 --- a/amadeus-streaming/src/distinct.rs +++ b/amadeus-streaming/src/distinct.rs @@ -337,7 +337,7 @@ where } fn get_alpha(p: u8) -> f64 { - assert!(4 <= p && p <= 16); + assert!((4..=16).contains(&p)); match p { 4 => 0.673, 5 => 0.697, diff --git a/amadeus-streaming/src/lib.rs b/amadeus-streaming/src/lib.rs index ab8040cf..3d827d31 100644 --- a/amadeus-streaming/src/lib.rs +++ b/amadeus-streaming/src/lib.rs @@ -53,7 +53,9 @@ clippy::float_cmp, clippy::unsafe_derive_deserialize, clippy::must_use_candidate, - clippy::unused_self + clippy::unused_self, + clippy::missing_panics_doc, + clippy::let_underscore_drop )] mod count_min; diff --git a/amadeus-streaming/src/sort.rs b/amadeus-streaming/src/sort.rs index fbf3c083..1d69ea68 100644 --- a/amadeus-streaming/src/sort.rs +++ b/amadeus-streaming/src/sort.rs @@ -194,7 +194,7 @@ mod btree_set { } fn trivial_ord_mut(&mut self) -> &mut std::collections::BTreeSet>> { let set: *mut std::collections::BTreeSet> = &mut self.set; - let set: *mut std::collections::BTreeSet>> = set as _; + let set: *mut std::collections::BTreeSet>> = set.cast(); // Sound due to repr(transparent) unsafe { &mut *set } } @@ -208,7 +208,7 @@ mod btree_set { } pub fn remove(&mut self, value: &T) -> Option { let value: *const T = value; - let value: *const TrivialOrd = value as _; + let value: *const TrivialOrd = value.cast(); let value = unsafe { &*value }; self.set.take(value).map(|node| node.t) } @@ -298,7 +298,7 @@ mod btree_set { impl Borrow> for Node { fn borrow(&self) -> &TrivialOrd { let self_: *const T = &self.t; - let self_: *const TrivialOrd = self_ as _; + let self_: *const TrivialOrd = self_.cast(); unsafe { &*self_ } } } diff --git a/amadeus-types/src/lib.rs b/amadeus-types/src/lib.rs index 8c5fb714..f211ea1b 100644 --- a/amadeus-types/src/lib.rs +++ b/amadeus-types/src/lib.rs @@ -28,7 +28,8 @@ clippy::wildcard_imports, clippy::default_trait_access, clippy::inline_always, - clippy::too_many_lines + clippy::too_many_lines, + clippy::missing_panics_doc )] #![deny(unsafe_code)] diff --git a/amadeus-types/src/util.rs b/amadeus-types/src/util.rs index 3ae2d59c..c66f7b55 100644 --- a/amadeus-types/src/util.rs +++ b/amadeus-types/src/util.rs @@ -14,11 +14,11 @@ pub(crate) trait IteratorExt: Iterator { loop { let x = match self.next() { None => { - if other.next().is_none() { - return Ordering::Equal; + return if other.next().is_none() { + Ordering::Equal } else { - return Ordering::Less; - } + Ordering::Less + }; } Some(val) => val, }; @@ -45,11 +45,11 @@ pub(crate) trait IteratorExt: Iterator { loop { let x = match self.next() { None => { - if other.next().is_none() { - return Some(Ordering::Equal); + return if other.next().is_none() { + Some(Ordering::Equal) } else { - return Some(Ordering::Less); - } + Some(Ordering::Less) + }; } Some(val) => val, }; diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 90d3bf3b..fcf2892b 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -20,7 +20,7 @@ jobs: endpoint: alecmocatta default: rust_toolchain: nightly - rust_lint_toolchain: nightly-2020-08-17 + rust_lint_toolchain: nightly-2021-03-25 rust_flags: '' rust_features_clippy: ';aws;commoncrawl;parquet;postgres;csv;json;constellation aws commoncrawl parquet postgres csv json bench' rust_features_miri: 'aws commoncrawl parquet postgres csv json' @@ -53,7 +53,7 @@ jobs: endpoint: alecmocatta default: rust_toolchain: stable - rust_lint_toolchain: nightly-2020-08-17 + rust_lint_toolchain: nightly-2021-03-25 rust_flags: '' rust_features_clippy: ';aws;commoncrawl;postgres;csv;json;aws commoncrawl postgres csv json' rust_features: 'aws commoncrawl postgres csv json' @@ -79,7 +79,7 @@ jobs: endpoint: alecmocatta default: rust_toolchain: nightly - rust_lint_toolchain: nightly-2020-08-17 + rust_lint_toolchain: nightly-2021-03-25 rust_flags: '' rust_packages: '-p amadeus-core -p amadeus-derive -p amadeus-parquet -p amadeus-serde -p amadeus-types -p amadeus' rust_features_clippy: ';parquet;csv;json;parquet csv json' @@ -104,7 +104,7 @@ jobs: endpoint: alecmocatta default: rust_toolchain: stable nightly - rust_lint_toolchain: nightly-2020-08-17 + rust_lint_toolchain: nightly-2021-03-25 rust_flags: '' rust_packages: '-p amadeus-core -p amadeus-derive -p amadeus-serde -p amadeus-types -p amadeus' rust_features_clippy: ';csv;json;csv json' diff --git a/src/lib.rs b/src/lib.rs index c7dbafa6..81f8fc2d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,7 +26,8 @@ clippy::if_not_else, clippy::must_use_candidate, clippy::missing_errors_doc, - clippy::missing_safety_doc + clippy::missing_safety_doc, + clippy::let_underscore_drop )] #![deny(unsafe_code)] diff --git a/src/pool/thread.rs b/src/pool/thread.rs index cb92d207..4f37fdb7 100644 --- a/src/pool/thread.rs +++ b/src/pool/thread.rs @@ -261,7 +261,7 @@ mod pool { #[allow(deprecated)] res.map(|t| { let t: *mut dyn Any = Box::into_raw(t); - *Box::from_raw(t as *mut T) + *Box::from_raw(t.cast()) }) .map_err(JoinError::panic) } From 90ce3ab7005b77b59a7e7b3d4119be1129632598 Mon Sep 17 00:00:00 2001 From: alecmocatta Date: Mon, 5 Apr 2021 13:06:46 +0100 Subject: [PATCH 10/11] bump fmt --- .rustfmt.toml | 2 +- amadeus-core/src/par_sink/combiner.rs | 6 +++++- amadeus-core/src/par_sink/folder.rs | 2 +- amadeus-core/src/par_sink/tuple.rs | 10 +++++++++- amadeus-parquet/src/internal/file/statistics.rs | 4 ++-- amadeus-parquet/src/internal/util/bit_util.rs | 4 ++-- 6 files changed, 20 insertions(+), 8 deletions(-) diff --git a/.rustfmt.toml b/.rustfmt.toml index 35945acb..ce60b5be 100644 --- a/.rustfmt.toml +++ b/.rustfmt.toml @@ -1,6 +1,6 @@ hard_tabs = true imports_layout = "Horizontal" -merge_imports = true +imports_granularity = "Crate" fn_args_layout = "Compressed" use_field_init_shorthand = true diff --git a/amadeus-core/src/par_sink/combiner.rs b/amadeus-core/src/par_sink/combiner.rs index d08fd061..a71d3ec3 100644 --- a/amadeus-core/src/par_sink/combiner.rs +++ b/amadeus-core/src/par_sink/combiner.rs @@ -1,4 +1,8 @@ -#![allow(unused_imports, clippy::single_component_path_imports, clippy::option_if_let_else)] +#![allow( + unused_imports, + clippy::single_component_path_imports, + clippy::option_if_let_else +)] use super::FolderSync; diff --git a/amadeus-core/src/par_sink/folder.rs b/amadeus-core/src/par_sink/folder.rs index a77d29e1..ba0b9425 100644 --- a/amadeus-core/src/par_sink/folder.rs +++ b/amadeus-core/src/par_sink/folder.rs @@ -1,4 +1,4 @@ -#![allow(unused_imports,clippy::single_component_path_imports)] +#![allow(unused_imports, clippy::single_component_path_imports)] use derive_new::new; use educe::Educe; diff --git a/amadeus-core/src/par_sink/tuple.rs b/amadeus-core/src/par_sink/tuple.rs index 320c6c73..027f405e 100644 --- a/amadeus-core/src/par_sink/tuple.rs +++ b/amadeus-core/src/par_sink/tuple.rs @@ -1,4 +1,12 @@ -#![allow(non_snake_case, clippy::type_complexity, irrefutable_let_patterns, clippy::new_without_default, unused_mut, unreachable_code, clippy::too_many_arguments)] +#![allow( + non_snake_case, + clippy::type_complexity, + irrefutable_let_patterns, + clippy::new_without_default, + unused_mut, + unreachable_code, + clippy::too_many_arguments +)] use derive_new::new; use futures::{pin_mut, ready, stream, Stream, StreamExt}; diff --git a/amadeus-parquet/src/internal/file/statistics.rs b/amadeus-parquet/src/internal/file/statistics.rs index 848bbf5b..bcabb138 100644 --- a/amadeus-parquet/src/internal/file/statistics.rs +++ b/amadeus-parquet/src/internal/file/statistics.rs @@ -72,8 +72,8 @@ macro_rules! statistics_enum_func { Statistics::Double(ref typed) => typed.$func(), Statistics::ByteArray(ref typed) => typed.$func(), Statistics::FixedLenByteArray(ref typed) => typed.$func(), - } - }}; + } + }}; } /// Converts Thrift definition into `Statistics`. diff --git a/amadeus-parquet/src/internal/util/bit_util.rs b/amadeus-parquet/src/internal/util/bit_util.rs index 6ad64822..6667d20e 100644 --- a/amadeus-parquet/src/internal/util/bit_util.rs +++ b/amadeus-parquet/src/internal/util/bit_util.rs @@ -33,9 +33,9 @@ macro_rules! read_num_bytes { let mut data: $ty = Default::default(); unsafe { ::std::ptr::copy_nonoverlapping($src.as_ptr(), &mut data as *mut $ty as *mut u8, $size); - } + } data - }}; + }}; } /// Converts value `val` of type `T` to a byte vector, by reading `num_bytes` from `val`. From bc57c1e90700bf3264a6ac098bcec0711ec69cda Mon Sep 17 00:00:00 2001 From: alecmocatta Date: Mon, 5 Apr 2021 13:17:46 +0100 Subject: [PATCH 11/11] clippy fixes --- README.md | 4 ++-- amadeus-core/src/lib.rs | 3 ++- amadeus-core/src/pipe.rs | 2 +- amadeus-streaming/src/top.rs | 28 ++++++++++++++-------------- src/pool/thread.rs | 2 +- 5 files changed, 20 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index f945765d..03d0aa64 100644 --- a/README.md +++ b/README.md @@ -176,7 +176,7 @@ async fn main() -> Result<(), Box> { "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/", AwsCredentials::Anonymous, - ))) + )), None) .await?; // Note: this isn't yet implemented! @@ -222,7 +222,7 @@ fn main() -> Result<(), Box> { "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/", AwsCredentials::Anonymous, - ))) + )), None) .await?; let top_pages = rows diff --git a/amadeus-core/src/lib.rs b/amadeus-core/src/lib.rs index 27047f3c..08156d7d 100644 --- a/amadeus-core/src/lib.rs +++ b/amadeus-core/src/lib.rs @@ -36,7 +36,8 @@ clippy::needless_pass_by_value, clippy::unnecessary_wraps, clippy::missing_panics_doc, - clippy::let_underscore_drop + clippy::let_underscore_drop, + clippy::unnested_or_patterns )] #![deny(unsafe_code)] diff --git a/amadeus-core/src/pipe.rs b/amadeus-core/src/pipe.rs index 1fb20acf..714c6ca1 100644 --- a/amadeus-core/src/pipe.rs +++ b/amadeus-core/src/pipe.rs @@ -306,7 +306,7 @@ impl + Unpin, St: ?Sized + Stream + Unpin, I let items = &mut **self_.items; let mut given_all = false; let stream = stream::poll_fn(|cx| match Pin::new(&mut *items).poll_next(cx) { - x @ (Poll::Ready(Some(_)) | Poll::Pending) => x, + x @ Poll::Ready(Some(_)) | x @ Poll::Pending => x, Poll::Ready(None) => { given_all = true; Poll::Pending diff --git a/amadeus-streaming/src/top.rs b/amadeus-streaming/src/top.rs index 41b6bc4f..4a5efa92 100644 --- a/amadeus-streaming/src/top.rs +++ b/amadeus-streaming/src/top.rs @@ -281,38 +281,38 @@ mod test { #[derive(Serialize, Deserialize)] #[serde(bound = "")] - struct HLL(HyperLogLog); - impl Ord for HLL { + struct Hll(HyperLogLog); + impl Ord for Hll { #[inline(always)] fn cmp(&self, other: &Self) -> cmp::Ordering { self.0.len().partial_cmp(&other.0.len()).unwrap() } } - impl PartialOrd for HLL { + impl PartialOrd for Hll { #[inline(always)] fn partial_cmp(&self, other: &Self) -> Option { self.0.len().partial_cmp(&other.0.len()) } } - impl PartialEq for HLL { + impl PartialEq for Hll { #[inline(always)] fn eq(&self, other: &Self) -> bool { self.0.len().eq(&other.0.len()) } } - impl Eq for HLL {} - impl Clone for HLL { + impl Eq for Hll {} + impl Clone for Hll { fn clone(&self) -> Self { Self(self.0.clone()) } } - impl New for HLL { + impl New for Hll { type Config = f64; fn new(config: &Self::Config) -> Self { Self(New::new(config)) } } - impl Intersect for HLL { + impl Intersect for Hll { fn intersect<'a>(iter: impl Iterator) -> Option where Self: Sized + 'a, @@ -320,22 +320,22 @@ mod test { Intersect::intersect(iter.map(|x| &x.0)).map(Self) } } - impl<'a, V: Hash> UnionAssign<&'a HLL> for HLL { + impl<'a, V: Hash> UnionAssign<&'a Hll> for Hll { fn union_assign(&mut self, rhs: &'a Self) { self.0.union_assign(&rhs.0) } } - impl<'a, V: Hash> ops::AddAssign<&'a V> for HLL { + impl<'a, V: Hash> ops::AddAssign<&'a V> for Hll { fn add_assign(&mut self, rhs: &'a V) { self.0.add_assign(rhs) } } - impl Debug for HLL { + impl Debug for Hll { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { self.0.fmt(fmt) } } - impl IntersectPlusUnionIsPlus for HLL { + impl IntersectPlusUnionIsPlus for Hll { const VAL: bool = as IntersectPlusUnionIsPlus>::VAL; } @@ -344,7 +344,7 @@ mod test { fn top_hll() { let mut rng = rand::rngs::SmallRng::from_seed([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]); - let mut top = Top::>::new(1000, 0.99, 2.0 / 1000.0, 0.00408); + let mut top = Top::>::new(1000, 0.99, 2.0 / 1000.0, 0.00408); // let mut x = HashMap::new(); for _ in 0..5_000 { let (a, b) = (rng.gen_range(0, 2) == 0, rng.gen_range(0, 2) == 0); @@ -373,7 +373,7 @@ mod test { let mut rng = rand::rngs::SmallRng::from_seed([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]); - let mut top = Top::>::new(1000, 0.99, 2.0 / 1000.0, 0.05); + let mut top = Top::>::new(1000, 0.99, 2.0 / 1000.0, 0.05); // let mut x = HashMap::new(); for _ in 0..5_000_000 { let (a, b) = (rng.gen_range(0, 2) == 0, rng.gen_range(0, 2) == 0); diff --git a/src/pool/thread.rs b/src/pool/thread.rs index 4f37fdb7..35c388e9 100644 --- a/src/pool/thread.rs +++ b/src/pool/thread.rs @@ -109,7 +109,7 @@ impl ThreadPool { wasm_bindgen_futures::spawn_local(remote); Guard::new(remote_handle.map_ok(|t| { let t: *mut dyn Send = Box::into_raw(t); - *Box::from_raw(t as *mut T) + *Box::from_raw(t.cast::()) })) } }