Skip to content

Commit c3e8e44

Browse files
committed
Parquet/async: Swap default MetadataLoader::load behaviour to suffix requests, swap get_bytes to take a GetRange (equivalent to object-store's)
1 parent bf9ce47 commit c3e8e44

File tree

3 files changed

+165
-92
lines changed

3 files changed

+165
-92
lines changed

parquet/src/arrow/async_reader/metadata.rs

Lines changed: 57 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use crate::arrow::async_reader::AsyncFileReader;
18+
use crate::arrow::async_reader::{AsyncFileReader, GetRange};
1919
use crate::errors::{ParquetError, Result};
2020
use crate::file::footer::{decode_footer, decode_metadata};
2121
use crate::file::metadata::ParquetMetaData;
@@ -25,15 +25,14 @@ use bytes::Bytes;
2525
use futures::future::BoxFuture;
2626
use futures::FutureExt;
2727
use std::future::Future;
28-
use std::ops::Range;
2928

3029
/// A data source that can be used with [`MetadataLoader`] to load [`ParquetMetaData`]
3130
pub trait MetadataFetch {
32-
fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
31+
fn fetch(&mut self, range: GetRange) -> BoxFuture<'_, Result<Bytes>>;
3332
}
3433

3534
impl<'a, T: AsyncFileReader> MetadataFetch for &'a mut T {
36-
fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
35+
fn fetch(&mut self, range: GetRange) -> BoxFuture<'_, Result<Bytes>> {
3736
self.get_bytes(range)
3837
}
3938
}
@@ -52,49 +51,30 @@ impl<F: MetadataFetch> MetadataLoader<F> {
5251
/// Create a new [`MetadataLoader`] by reading the footer information
5352
///
5453
/// See [`fetch_parquet_metadata`] for the meaning of the individual parameters
55-
pub async fn load(mut fetch: F, file_size: usize, prefetch: Option<usize>) -> Result<Self> {
56-
if file_size < 8 {
57-
return Err(ParquetError::EOF(format!(
58-
"file size of {file_size} is less than footer"
59-
)));
60-
}
54+
pub async fn load(mut fetch: F, prefetch: Option<usize>) -> Result<Self> {
6155

62-
// If a size hint is provided, read more than the minimum size
63-
// to try and avoid a second fetch.
64-
let footer_start = if let Some(size_hint) = prefetch {
65-
file_size.saturating_sub(size_hint)
66-
} else {
67-
file_size - 8
68-
};
69-
70-
let suffix = fetch.fetch(footer_start..file_size).await?;
56+
let suffix = fetch.fetch(GetRange::Suffix(prefetch.unwrap_or(8))).await?;
7157
let suffix_len = suffix.len();
7258

7359
let mut footer = [0; 8];
7460
footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]);
7561

7662
let length = decode_footer(&footer)?;
7763

78-
if file_size < length + 8 {
79-
return Err(ParquetError::EOF(format!(
80-
"file size of {} is less than footer + metadata {}",
81-
file_size,
82-
length + 8
83-
)));
84-
}
85-
8664
// Did not fetch the entire file metadata in the initial read, need to make a second request
8765
let (metadata, remainder) = if length > suffix_len - 8 {
88-
let metadata_start = file_size - length - 8;
89-
let meta = fetch.fetch(metadata_start..file_size - 8).await?;
90-
(decode_metadata(&meta)?, None)
66+
let metadata_offset = length + 8;
67+
let meta = fetch.fetch(GetRange::Suffix(metadata_offset)).await?;
68+
let slice = &meta[0..length];
69+
(decode_metadata(&slice)?, None)
9170
} else {
92-
let metadata_start = file_size - length - 8 - footer_start;
71+
let metadata_offset = length + 8;
72+
let metadata_start = suffix_len - metadata_offset;
9373

9474
let slice = &suffix[metadata_start..suffix_len - 8];
9575
(
9676
decode_metadata(slice)?,
97-
Some((footer_start, suffix.slice(..metadata_start))),
77+
Some((0, suffix.slice(..metadata_start))),
9878
)
9979
};
10080

@@ -105,6 +85,10 @@ impl<F: MetadataFetch> MetadataLoader<F> {
10585
})
10686
}
10787

88+
pub async fn load_absolute(mut fetch: F, file_size: usize, prefetch: Option<usize>) -> Result<Self> {
89+
todo!()
90+
}
91+
10892
/// Create a new [`MetadataLoader`] from an existing [`ParquetMetaData`]
10993
pub fn new(fetch: F, metadata: ParquetMetaData) -> Self {
11094
Self {
@@ -133,13 +117,15 @@ impl<F: MetadataFetch> MetadataLoader<F> {
133117
Some(range) => range,
134118
};
135119

120+
let page_index_len = range.end - range.start;
121+
// TODO: determine if _remainder_start is needed even in the non-suffix request case
136122
let data = match &self.remainder {
137-
Some((remainder_start, remainder)) if *remainder_start <= range.start => {
138-
let offset = range.start - *remainder_start;
139-
remainder.slice(offset..range.end - *remainder_start + offset)
123+
Some((_remainder_start, remainder)) if remainder.len() >= page_index_len => {
124+
let offset = remainder.len() - page_index_len;
125+
remainder.slice(offset..)
140126
}
141127
// Note: this will potentially fetch data already in remainder, this keeps things simple
142-
_ => self.fetch.fetch(range.start..range.end).await?,
128+
_ => self.fetch.fetch((range.start..range.end).into()).await?,
143129
};
144130

145131
// Sanity check
@@ -200,10 +186,10 @@ struct MetadataFetchFn<F>(F);
200186

201187
impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
202188
where
203-
F: FnMut(Range<usize>) -> Fut + Send,
189+
F: FnMut(GetRange) -> Fut + Send,
204190
Fut: Future<Output = Result<Bytes>> + Send,
205191
{
206-
fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
192+
fn fetch(&mut self, range: GetRange) -> BoxFuture<'_, Result<Bytes>> {
207193
async move { self.0(range).await }.boxed()
208194
}
209195
}
@@ -226,15 +212,18 @@ where
226212
/// significantly reduce the number of `fetch` requests, and consequently latency
227213
pub async fn fetch_parquet_metadata<F, Fut>(
228214
fetch: F,
229-
file_size: usize,
215+
file_size: Option<usize>,
230216
prefetch: Option<usize>,
231217
) -> Result<ParquetMetaData>
232218
where
233-
F: FnMut(Range<usize>) -> Fut + Send,
219+
F: FnMut(GetRange) -> Fut + Send,
234220
Fut: Future<Output = Result<Bytes>> + Send,
235221
{
236222
let fetch = MetadataFetchFn(fetch);
237-
let loader = MetadataLoader::load(fetch, file_size, prefetch).await?;
223+
let loader = match file_size {
224+
Some(file_size) => MetadataLoader::load_absolute(fetch, file_size, prefetch).await?,
225+
None => MetadataLoader::load(fetch, prefetch).await?
226+
};
238227
Ok(loader.finish())
239228
}
240229

@@ -247,7 +236,13 @@ mod tests {
247236
use std::io::{Read, Seek, SeekFrom};
248237
use std::sync::atomic::{AtomicUsize, Ordering};
249238

250-
fn read_range(file: &mut File, range: Range<usize>) -> Result<Bytes> {
239+
fn read_range(file: &mut File, range: GetRange) -> Result<Bytes> {
240+
let file_size = file.len().try_into().unwrap();
241+
let range = match range {
242+
GetRange::Bounded(range) => range,
243+
GetRange::Offset(offset) => offset..file_size,
244+
GetRange::Suffix(end_offset) => (file_size.saturating_sub(end_offset.try_into().unwrap())..file_size)
245+
};
251246
file.seek(SeekFrom::Start(range.start as _))?;
252247
let len = range.end - range.start;
253248
let mut buf = Vec::with_capacity(len);
@@ -269,41 +264,41 @@ mod tests {
269264
futures::future::ready(read_range(&mut file, range))
270265
};
271266

272-
let actual = fetch_parquet_metadata(&mut fetch, len, None).await.unwrap();
267+
let actual = fetch_parquet_metadata(&mut fetch, None, None).await.unwrap();
273268
assert_eq!(actual.file_metadata().schema(), expected);
274269
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
275270

276271
// Metadata hint too small
277272
fetch_count.store(0, Ordering::SeqCst);
278-
let actual = fetch_parquet_metadata(&mut fetch, len, Some(10))
273+
let actual = fetch_parquet_metadata(&mut fetch, None, Some(10))
279274
.await
280275
.unwrap();
281276
assert_eq!(actual.file_metadata().schema(), expected);
282277
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
283278

284279
// Metadata hint too large
285280
fetch_count.store(0, Ordering::SeqCst);
286-
let actual = fetch_parquet_metadata(&mut fetch, len, Some(500))
281+
let actual = fetch_parquet_metadata(&mut fetch, None, Some(500))
287282
.await
288283
.unwrap();
289284
assert_eq!(actual.file_metadata().schema(), expected);
290285
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
291286

292287
// Metadata hint exactly correct
293288
fetch_count.store(0, Ordering::SeqCst);
294-
let actual = fetch_parquet_metadata(&mut fetch, len, Some(428))
289+
let actual = fetch_parquet_metadata(&mut fetch, None, Some(428))
295290
.await
296291
.unwrap();
297292
assert_eq!(actual.file_metadata().schema(), expected);
298293
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
299294

300-
let err = fetch_parquet_metadata(&mut fetch, 4, None)
295+
let err = fetch_parquet_metadata(&mut fetch, Some(4), None)
301296
.await
302297
.unwrap_err()
303298
.to_string();
304299
assert_eq!(err, "EOF: file size of 4 is less than footer");
305300

306-
let err = fetch_parquet_metadata(&mut fetch, 20, None)
301+
let err = fetch_parquet_metadata(&mut fetch, Some(20), None)
307302
.await
308303
.unwrap_err()
309304
.to_string();
@@ -321,7 +316,7 @@ mod tests {
321316
};
322317

323318
let f = MetadataFetchFn(&mut fetch);
324-
let mut loader = MetadataLoader::load(f, len, None).await.unwrap();
319+
let mut loader = MetadataLoader::load(f, None).await.unwrap();
325320
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
326321
loader.load_page_index(true, true).await.unwrap();
327322
assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
@@ -331,7 +326,7 @@ mod tests {
331326
// Prefetch just footer exactly
332327
fetch_count.store(0, Ordering::SeqCst);
333328
let f = MetadataFetchFn(&mut fetch);
334-
let mut loader = MetadataLoader::load(f, len, Some(1729)).await.unwrap();
329+
let mut loader = MetadataLoader::load(f, Some(1729)).await.unwrap();
335330
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
336331
loader.load_page_index(true, true).await.unwrap();
337332
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
@@ -341,7 +336,7 @@ mod tests {
341336
// Prefetch more than footer but not enough
342337
fetch_count.store(0, Ordering::SeqCst);
343338
let f = MetadataFetchFn(&mut fetch);
344-
let mut loader = MetadataLoader::load(f, len, Some(130649)).await.unwrap();
339+
let mut loader = MetadataLoader::load(f, Some(130649)).await.unwrap();
345340
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
346341
loader.load_page_index(true, true).await.unwrap();
347342
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
@@ -351,7 +346,17 @@ mod tests {
351346
// Prefetch exactly enough
352347
fetch_count.store(0, Ordering::SeqCst);
353348
let f = MetadataFetchFn(&mut fetch);
354-
let mut loader = MetadataLoader::load(f, len, Some(130650)).await.unwrap();
349+
let mut loader = MetadataLoader::load(f, Some(130650)).await.unwrap();
350+
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
351+
loader.load_page_index(true, true).await.unwrap();
352+
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
353+
let metadata = loader.finish();
354+
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
355+
356+
// Prefetch more than enough
357+
fetch_count.store(0, Ordering::SeqCst);
358+
let f = MetadataFetchFn(&mut fetch);
359+
let mut loader = MetadataLoader::load(f, Some(131651)).await.unwrap();
355360
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
356361
loader.load_page_index(true, true).await.unwrap();
357362
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);

0 commit comments

Comments
 (0)