Skip to content

Commit 73cd820

Browse files
committed
cargo fmt
1 parent c3e8e44 commit 73cd820

File tree

3 files changed

+36
-16
lines changed

3 files changed

+36
-16
lines changed

parquet/src/arrow/async_reader/metadata.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ impl<F: MetadataFetch> MetadataLoader<F> {
5252
///
5353
/// See [`fetch_parquet_metadata`] for the meaning of the individual parameters
5454
pub async fn load(mut fetch: F, prefetch: Option<usize>) -> Result<Self> {
55-
5655
let suffix = fetch.fetch(GetRange::Suffix(prefetch.unwrap_or(8))).await?;
5756
let suffix_len = suffix.len();
5857

@@ -85,7 +84,11 @@ impl<F: MetadataFetch> MetadataLoader<F> {
8584
})
8685
}
8786

88-
pub async fn load_absolute(mut fetch: F, file_size: usize, prefetch: Option<usize>) -> Result<Self> {
87+
pub async fn load_absolute(
88+
mut fetch: F,
89+
file_size: usize,
90+
prefetch: Option<usize>,
91+
) -> Result<Self> {
8992
todo!()
9093
}
9194

@@ -222,7 +225,7 @@ where
222225
let fetch = MetadataFetchFn(fetch);
223226
let loader = match file_size {
224227
Some(file_size) => MetadataLoader::load_absolute(fetch, file_size, prefetch).await?,
225-
None => MetadataLoader::load(fetch, prefetch).await?
228+
None => MetadataLoader::load(fetch, prefetch).await?,
226229
};
227230
Ok(loader.finish())
228231
}
@@ -241,7 +244,9 @@ mod tests {
241244
let range = match range {
242245
GetRange::Bounded(range) => range,
243246
GetRange::Offset(offset) => offset..file_size,
244-
GetRange::Suffix(end_offset) => (file_size.saturating_sub(end_offset.try_into().unwrap())..file_size)
247+
GetRange::Suffix(end_offset) => {
248+
(file_size.saturating_sub(end_offset.try_into().unwrap())..file_size)
249+
}
245250
};
246251
file.seek(SeekFrom::Start(range.start as _))?;
247252
let len = range.end - range.start;
@@ -264,7 +269,9 @@ mod tests {
264269
futures::future::ready(read_range(&mut file, range))
265270
};
266271

267-
let actual = fetch_parquet_metadata(&mut fetch, None, None).await.unwrap();
272+
let actual = fetch_parquet_metadata(&mut fetch, None, None)
273+
.await
274+
.unwrap();
268275
assert_eq!(actual.file_metadata().schema(), expected);
269276
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
270277

@@ -352,7 +359,7 @@ mod tests {
352359
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
353360
let metadata = loader.finish();
354361
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
355-
362+
356363
// Prefetch more than enough
357364
fetch_count.store(0, Ordering::SeqCst);
358365
let f = MetadataFetchFn(&mut fetch);

parquet/src/arrow/async_reader/mod.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -200,11 +200,11 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
200200
GetRange::Suffix(end_offset) => {
201201
self.seek(SeekFrom::End(-(end_offset as i64))).await?;
202202
Some(end_offset)
203-
},
203+
}
204204
GetRange::Offset(offset) => {
205205
self.seek(SeekFrom::Start(offset as u64)).await?;
206206
None
207-
},
207+
}
208208
GetRange::Bounded(range) => {
209209
self.seek(SeekFrom::Start(range.start as u64)).await?;
210210
Some(range.end - range.start)
@@ -406,7 +406,10 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
406406
};
407407

408408
let buffer = match column_metadata.bloom_filter_length() {
409-
Some(length) => self.input.0.get_bytes((offset..offset + length as usize).into()),
409+
Some(length) => self
410+
.input
411+
.0
412+
.get_bytes((offset..offset + length as usize).into()),
410413
None => self
411414
.input
412415
.0
@@ -441,7 +444,9 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
441444
})?;
442445
self.input
443446
.0
444-
.get_bytes((bitset_offset as usize..bitset_offset as usize + bitset_length).into())
447+
.get_bytes(
448+
(bitset_offset as usize..bitset_offset as usize + bitset_length).into(),
449+
)
445450
.await?
446451
}
447452
};
@@ -989,7 +994,9 @@ mod tests {
989994
let range = match range {
990995
GetRange::Bounded(range) => range,
991996
GetRange::Offset(offset) => offset..self.data.len(),
992-
GetRange::Suffix(end_offset) => self.data.len().saturating_sub(end_offset)..self.data.len()
997+
GetRange::Suffix(end_offset) => {
998+
self.data.len().saturating_sub(end_offset)..self.data.len()
999+
}
9931000
};
9941001
futures::future::ready(Ok(self.data.slice(range))).boxed()
9951002
}

parquet/src/arrow/async_reader/store.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use futures::future::BoxFuture;
2323
use futures::{FutureExt, TryFutureExt};
2424

2525
use object_store::GetOptions;
26-
use object_store::{ObjectStore, path::Path};
26+
use object_store::{path::Path, ObjectStore};
2727

2828
use crate::arrow::async_reader::{AsyncFileReader, MetadataLoader};
2929
use crate::errors::Result;
@@ -118,10 +118,16 @@ impl AsyncFileReader for ParquetObjectReader {
118118
fn get_bytes(&mut self, range: GetRange) -> BoxFuture<'_, Result<Bytes>> {
119119
async move {
120120
self.store
121-
.get_opts(&self.location, GetOptions {
122-
range: Some(range.into()),
123-
..Default::default()
124-
}).await?.bytes().await
121+
.get_opts(
122+
&self.location,
123+
GetOptions {
124+
range: Some(range.into()),
125+
..Default::default()
126+
},
127+
)
128+
.await?
129+
.bytes()
130+
.await
125131
}
126132
.map_err(|e| e.into())
127133
.boxed()

0 commit comments

Comments
 (0)