Skip to content

Commit 76a4633

Browse files
committed
Add tests, implementation for MetadataLoader::load_absolute
1 parent b671545 commit 76a4633

File tree

2 files changed

+142
-5
lines changed

2 files changed

+142
-5
lines changed

parquet/src/arrow/async_reader/metadata.rs

Lines changed: 116 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,56 @@ impl<F: MetadataFetch> MetadataLoader<F> {
8989
file_size: usize,
9090
prefetch: Option<usize>,
9191
) -> Result<Self> {
92-
todo!()
92+
if file_size < 8 {
93+
return Err(ParquetError::EOF(format!(
94+
"file size of {file_size} is less than footer"
95+
)));
96+
}
97+
98+
// If a size hint is provided, read more than the minimum size
99+
// to try and avoid a second fetch.
100+
let footer_start = if let Some(size_hint) = prefetch {
101+
file_size.saturating_sub(size_hint)
102+
} else {
103+
file_size - 8
104+
};
105+
106+
let suffix = fetch.fetch((footer_start..file_size).into()).await?;
107+
let suffix_len = suffix.len();
108+
109+
let mut footer = [0; 8];
110+
footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]);
111+
112+
let length = decode_footer(&footer)?;
113+
114+
if file_size < length + 8 {
115+
return Err(ParquetError::EOF(format!(
116+
"file size of {} is less than footer + metadata {}",
117+
file_size,
118+
length + 8
119+
)));
120+
}
121+
122+
// Did not fetch the entire file metadata in the initial read, need to make a second request
123+
let (metadata, remainder) = if length > suffix_len - 8 {
124+
let metadata_start = file_size - length - 8;
125+
let meta = fetch.fetch((metadata_start..file_size - 8).into()).await?;
126+
(decode_metadata(&meta)?, None)
127+
} else {
128+
let metadata_offset = length + 8;
129+
let metadata_start = suffix_len - metadata_offset;
130+
131+
let slice = &suffix[metadata_start..suffix_len - 8];
132+
(
133+
decode_metadata(slice)?,
134+
Some((0, suffix.slice(..metadata_start))),
135+
)
136+
};
137+
Ok(Self {
138+
fetch,
139+
metadata,
140+
remainder,
141+
})
93142
}
94143

95144
/// Create a new [`MetadataLoader`] from an existing [`ParquetMetaData`]
@@ -245,7 +294,7 @@ mod tests {
245294
GetRange::Bounded(range) => range,
246295
GetRange::Offset(offset) => offset..file_size,
247296
GetRange::Suffix(end_offset) => {
248-
(file_size.saturating_sub(end_offset.try_into().unwrap())..file_size)
297+
file_size.saturating_sub(end_offset.try_into().unwrap())..file_size
249298
}
250299
};
251300
file.seek(SeekFrom::Start(range.start as _))?;
@@ -268,7 +317,14 @@ mod tests {
268317
fetch_count.fetch_add(1, Ordering::SeqCst);
269318
futures::future::ready(read_range(&mut file, range))
270319
};
320+
// Known file size, unknown metadata size
321+
let actual = fetch_parquet_metadata(&mut fetch, Some(len), None)
322+
.await
323+
.unwrap();
324+
assert_eq!(actual.file_metadata().schema(), expected);
325+
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
271326

327+
fetch_count.store(0, Ordering::SeqCst);
272328
let actual = fetch_parquet_metadata(&mut fetch, None, None)
273329
.await
274330
.unwrap();
@@ -369,5 +425,63 @@ mod tests {
369425
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
370426
let metadata = loader.finish();
371427
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
428+
429+
// Known-size file
430+
fetch_count.store(0, Ordering::SeqCst);
431+
let f = MetadataFetchFn(&mut fetch);
432+
let mut loader = MetadataLoader::load_absolute(f, len, None).await.unwrap();
433+
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
434+
loader.load_page_index(true, true).await.unwrap();
435+
assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
436+
let metadata = loader.finish();
437+
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
438+
439+
// Prefetch just footer exactly
440+
fetch_count.store(0, Ordering::SeqCst);
441+
let f = MetadataFetchFn(&mut fetch);
442+
let mut loader = MetadataLoader::load_absolute(f, len, Some(1729))
443+
.await
444+
.unwrap();
445+
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
446+
loader.load_page_index(true, true).await.unwrap();
447+
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
448+
let metadata = loader.finish();
449+
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
450+
451+
// Prefetch more than footer but not enough
452+
fetch_count.store(0, Ordering::SeqCst);
453+
let f = MetadataFetchFn(&mut fetch);
454+
let mut loader = MetadataLoader::load_absolute(f, len, Some(130649))
455+
.await
456+
.unwrap();
457+
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
458+
loader.load_page_index(true, true).await.unwrap();
459+
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
460+
let metadata = loader.finish();
461+
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
462+
463+
// Prefetch exactly enough
464+
fetch_count.store(0, Ordering::SeqCst);
465+
let f = MetadataFetchFn(&mut fetch);
466+
let mut loader = MetadataLoader::load_absolute(f, len, Some(130650))
467+
.await
468+
.unwrap();
469+
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
470+
loader.load_page_index(true, true).await.unwrap();
471+
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
472+
let metadata = loader.finish();
473+
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
474+
475+
// Prefetch more than enough
476+
fetch_count.store(0, Ordering::SeqCst);
477+
let f = MetadataFetchFn(&mut fetch);
478+
let mut loader = MetadataLoader::load_absolute(f, len, Some(131651))
479+
.await
480+
.unwrap();
481+
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
482+
loader.load_page_index(true, true).await.unwrap();
483+
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
484+
let metadata = loader.finish();
485+
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
372486
}
373487
}

parquet/src/arrow/async_reader/store.rs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ pub struct ParquetObjectReader {
7070
store: Arc<dyn ObjectStore>,
7171
location: Path,
7272
metadata_size_hint: Option<usize>,
73+
file_size: Option<usize>,
7374
preload_column_index: bool,
7475
preload_offset_index: bool,
7576
}
@@ -82,12 +83,20 @@ impl ParquetObjectReader {
8283
Self {
8384
store,
8485
location,
86+
file_size: None,
8587
metadata_size_hint: None,
8688
preload_column_index: false,
8789
preload_offset_index: false,
8890
}
8991
}
9092

93+
/// Provide the size of the file, for object stores that do not support suffix ranges (e.g. Azure)
94+
pub fn with_file_size(self, file_size: usize) -> Self {
95+
Self {
96+
file_size: Some(file_size),
97+
..self
98+
}
99+
}
91100
/// Provide a hint as to the size of the parquet file's footer,
92101
/// see [fetch_parquet_metadata](crate::arrow::async_reader::fetch_parquet_metadata)
93102
pub fn with_footer_size_hint(self, hint: usize) -> Self {
@@ -151,9 +160,10 @@ impl AsyncFileReader for ParquetObjectReader {
151160
let preload_column_index = self.preload_column_index;
152161
let preload_offset_index = self.preload_offset_index;
153162
let prefetch = self.metadata_size_hint;
154-
// TODO: distinguish between suffix-supporting object stores, and those that don't
155-
// Alternatively, surface in the form of a flag on this struct.
156-
let mut loader = MetadataLoader::load(self, prefetch).await?;
163+
let mut loader = match self.file_size {
164+
Some(file_size) => MetadataLoader::load_absolute(self, file_size, prefetch).await,
165+
None => MetadataLoader::load(self, prefetch).await,
166+
}?;
157167
loader
158168
.load_page_index(preload_column_index, preload_offset_index)
159169
.await?;
@@ -192,6 +202,19 @@ mod tests {
192202
assert_eq!(batches.len(), 1);
193203
assert_eq!(batches[0].num_rows(), 8);
194204

205+
let meta = store.head(&location).await.unwrap();
206+
let file_size = meta.size;
207+
208+
let object_reader = ParquetObjectReader::new(Arc::clone(&store), location.clone())
209+
.with_file_size(file_size);
210+
let builder = ParquetRecordBatchStreamBuilder::new(object_reader)
211+
.await
212+
.unwrap();
213+
let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();
214+
215+
assert_eq!(batches.len(), 1);
216+
assert_eq!(batches[0].num_rows(), 8);
217+
195218
location = Path::from("I don't exist.parquet");
196219

197220
let object_reader = ParquetObjectReader::new(store, location);

0 commit comments

Comments
 (0)