Skip to content

Commit

Permalink
clippy fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
alecmocatta committed Apr 5, 2021
1 parent b5a3ab0 commit 1972489
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 18 deletions.
5 changes: 3 additions & 2 deletions src/helpers/filter_nulls_and_unwrap.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![allow(clippy::type_complexity, clippy::option_option)]

use crate::par_stream::{Filter, Map};
use amadeus_core::par_stream::ParallelStream;

Expand All @@ -17,8 +19,7 @@ where
fn filter_nulls_and_unwrap(
self,
) -> Map<Filter<T, OptionFilterNullHandler>, fn(Option<O>) -> O> {
self.filter(OptionFilterNullHandler {})
.map(|unwrapped_value: Option<O>| unwrapped_value.unwrap())
self.filter(OptionFilterNullHandler {}).map(Option::unwrap)
}
}

Expand Down
6 changes: 5 additions & 1 deletion src/pool/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,11 @@ impl Synchronize {
F: Future<Output = ()>,
{
let nonce = self.nonce.load(Ordering::SeqCst);
if !self.running.compare_and_swap(false, true, Ordering::SeqCst) {
if self
.running
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
let on_drop = OnDrop::new(|| self.running.store(false, Ordering::SeqCst));
f.await;
on_drop.cancel();
Expand Down
26 changes: 11 additions & 15 deletions tests/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@
use std::{collections::HashMap, path::PathBuf, time::SystemTime};

use amadeus::prelude::*;
use amadeus_parquet::get_row_predicate;

fn assert_columns_in_row<T>(
expected_column_names: &Vec<String>, row: Result<Value, T>,
expected_column_names: &[String], row: Result<Value, T>,
) -> Result<Value, T> {
let row_unwrapped = row.ok().unwrap();
let row_group = row_unwrapped.clone().into_group().ok().unwrap();
Expand All @@ -38,20 +37,17 @@ async fn dynamic_reads() {

let column_names = vec!["uri".to_string(), "location".to_string()];

#[cfg(feature = "aws")]
{
let rows = Parquet::<_, Value>::new(vec![
S3File::new_with(AwsRegion::UsEast1, "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/year=2018/month=11/day=02/part-00176-17868f39-cd99-4b60-bb48-8daf9072122e.c000.snappy.parquet", AwsCredentials::Anonymous),
], get_row_predicate(column_names.clone())).await.unwrap();
let rows = Parquet::<_, Value>::new(vec![
PathBuf::from("amadeus-testing/parquet/cf-accesslogs/year=2018/month=11/day=02/part-00176-17868f39-cd99-4b60-bb48-8daf9072122e.c000.snappy.parquet"),
], amadeus_parquet::get_row_predicate(column_names.clone())).await.unwrap();

assert_eq!(
rows.par_stream()
.map(move |row: Result<Value, _>| assert_columns_in_row(&column_names, row))
.count(pool)
.await,
4172
);
}
assert_eq!(
rows.par_stream()
.map(move |row: Result<Value, _>| assert_columns_in_row(&column_names, row))
.count(pool)
.await,
4172
);

println!("in {:?}", start.elapsed().unwrap());
}
Expand Down

0 comments on commit 1972489

Please sign in to comment.