diff --git a/src/helpers/filter_nulls_and_unwrap.rs b/src/helpers/filter_nulls_and_unwrap.rs index 440a7a78..5435d8d2 100644 --- a/src/helpers/filter_nulls_and_unwrap.rs +++ b/src/helpers/filter_nulls_and_unwrap.rs @@ -1,3 +1,5 @@ +#![allow(clippy::type_complexity, clippy::option_option)] + use crate::par_stream::{Filter, Map}; use amadeus_core::par_stream::ParallelStream; @@ -17,8 +19,7 @@ where fn filter_nulls_and_unwrap( self, ) -> Map, fn(Option) -> O> { - self.filter(OptionFilterNullHandler {}) - .map(|unwrapped_value: Option| unwrapped_value.unwrap()) + self.filter(OptionFilterNullHandler {}).map(Option::unwrap) } } diff --git a/src/pool/util.rs b/src/pool/util.rs index af61c67d..7db0e9c0 100644 --- a/src/pool/util.rs +++ b/src/pool/util.rs @@ -89,7 +89,11 @@ impl Synchronize { F: Future, { let nonce = self.nonce.load(Ordering::SeqCst); - if !self.running.compare_and_swap(false, true, Ordering::SeqCst) { + if self + .running + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + .is_ok() + { let on_drop = OnDrop::new(|| self.running.store(false, Ordering::SeqCst)); f.await; on_drop.cancel(); diff --git a/tests/parquet.rs b/tests/parquet.rs index 2925ede4..acedfe75 100644 --- a/tests/parquet.rs +++ b/tests/parquet.rs @@ -8,10 +8,9 @@ use std::{collections::HashMap, path::PathBuf, time::SystemTime}; use amadeus::prelude::*; -use amadeus_parquet::get_row_predicate; fn assert_columns_in_row( - expected_column_names: &Vec, row: Result, + expected_column_names: &[String], row: Result, ) -> Result { let row_unwrapped = row.ok().unwrap(); let row_group = row_unwrapped.clone().into_group().ok().unwrap(); @@ -38,20 +37,17 @@ async fn dynamic_reads() { let column_names = vec!["uri".to_string(), "location".to_string()]; - #[cfg(feature = "aws")] - { - let rows = Parquet::<_, Value>::new(vec![ - S3File::new_with(AwsRegion::UsEast1, "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/year=2018/month=11/day=02/part-00176-17868f39-cd99-4b60-bb48-8daf9072122e.c000.snappy.parquet", AwsCredentials::Anonymous), - ], get_row_predicate(column_names.clone())).await.unwrap(); + let rows = Parquet::<_, Value>::new(vec![ + PathBuf::from("amadeus-testing/parquet/cf-accesslogs/year=2018/month=11/day=02/part-00176-17868f39-cd99-4b60-bb48-8daf9072122e.c000.snappy.parquet"), + ], amadeus_parquet::get_row_predicate(column_names.clone())).await.unwrap(); - assert_eq!( - rows.par_stream() - .map(move |row: Result| assert_columns_in_row(&column_names, row)) - .count(pool) - .await, - 4172 - ); - } + assert_eq!( + rows.par_stream() + .map(move |row: Result| assert_columns_in_row(&column_names, row)) + .count(pool) + .await, + 4172 + ); println!("in {:?}", start.elapsed().unwrap()); }