Skip to content

Commit a3598a9

Browse files
authored
Parquet: Support reading Parquet metadata via suffix range requests (#7334)
* Support reading Parquet metadata via suffix requests * Update doctest * Add simple test without file length provided * address comments * Switch `file_size` optional arg to `with_file_size` method * Move `prefetch` lookup into `load_metadat` * Remove extra call to `max` * fix doctest * Add test that suffix requests are made * more tests
1 parent 99b2b3b commit a3598a9

File tree

4 files changed

+300
-31
lines changed

4 files changed

+300
-31
lines changed

parquet/src/arrow/async_reader/metadata.rs

+10
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,16 @@ impl<T: AsyncFileReader> MetadataFetch for &mut T {
7575
}
7676
}
7777

78+
/// A data source that can be used with [`MetadataLoader`] to load [`ParquetMetaData`] via suffix
79+
/// requests, without knowing the file size
80+
pub trait MetadataSuffixFetch: MetadataFetch {
81+
/// Return a future that fetches the last `n` bytes asynchronously
82+
///
83+
/// Note the returned type is a boxed future, often created by
84+
/// [FutureExt::boxed]. See the trait documentation for an example
85+
fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>>;
86+
}
87+
7888
/// An asynchronous interface to load [`ParquetMetaData`] from an async source
7989
pub struct MetadataLoader<F> {
8090
/// Function that fetches byte ranges asynchronously

parquet/src/arrow/async_reader/store.rs

+84-25
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,13 @@
1818
use std::{ops::Range, sync::Arc};
1919

2020
use crate::arrow::arrow_reader::ArrowReaderOptions;
21-
use crate::arrow::async_reader::AsyncFileReader;
21+
use crate::arrow::async_reader::{AsyncFileReader, MetadataSuffixFetch};
2222
use crate::errors::{ParquetError, Result};
2323
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
2424
use bytes::Bytes;
2525
use futures::{future::BoxFuture, FutureExt, TryFutureExt};
26-
use object_store::{path::Path, ObjectMeta, ObjectStore};
26+
use object_store::{path::Path, ObjectStore};
27+
use object_store::{GetOptions, GetRange};
2728
use tokio::runtime::Handle;
2829

2930
/// Reads Parquet files in object storage using [`ObjectStore`].
@@ -45,29 +46,29 @@ use tokio::runtime::Handle;
4546
/// println!("Found Blob with {}B at {}", meta.size, meta.location);
4647
///
4748
/// // Show Parquet metadata
48-
/// let reader = ParquetObjectReader::new(storage_container, meta);
49+
/// let reader = ParquetObjectReader::new(storage_container, meta.location).with_file_size(meta.size);
4950
/// let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
5051
/// print_parquet_metadata(&mut stdout(), builder.metadata());
5152
/// # }
5253
/// ```
5354
#[derive(Clone, Debug)]
5455
pub struct ParquetObjectReader {
5556
store: Arc<dyn ObjectStore>,
56-
meta: ObjectMeta,
57+
path: Path,
58+
file_size: Option<usize>,
5759
metadata_size_hint: Option<usize>,
5860
preload_column_index: bool,
5961
preload_offset_index: bool,
6062
runtime: Option<Handle>,
6163
}
6264

6365
impl ParquetObjectReader {
64-
/// Creates a new [`ParquetObjectReader`] for the provided [`ObjectStore`] and [`ObjectMeta`]
65-
///
66-
/// [`ObjectMeta`] can be obtained using [`ObjectStore::list`] or [`ObjectStore::head`]
67-
pub fn new(store: Arc<dyn ObjectStore>, meta: ObjectMeta) -> Self {
66+
/// Creates a new [`ParquetObjectReader`] for the provided [`ObjectStore`] and [`Path`].
67+
pub fn new(store: Arc<dyn ObjectStore>, path: Path) -> Self {
6868
Self {
6969
store,
70-
meta,
70+
path,
71+
file_size: None,
7172
metadata_size_hint: None,
7273
preload_column_index: false,
7374
preload_offset_index: false,
@@ -84,6 +85,22 @@ impl ParquetObjectReader {
8485
}
8586
}
8687

88+
/// Provide the byte size of this file.
89+
///
90+
/// If provided, the file size will ensure that only bounded range requests are used. If file
91+
/// size is not provided, the reader will use suffix range requests to fetch the metadata.
92+
///
93+
/// Providing this size up front is an important optimization to avoid extra calls when the
94+
/// underlying store does not support suffix range requests.
95+
///
96+
/// The file size can be obtained using [`ObjectStore::list`] or [`ObjectStore::head`].
97+
pub fn with_file_size(self, file_size: usize) -> Self {
98+
Self {
99+
file_size: Some(file_size),
100+
..self
101+
}
102+
}
103+
87104
/// Load the Column Index as part of [`Self::get_metadata`]
88105
pub fn with_preload_column_index(self, preload_column_index: bool) -> Self {
89106
Self {
@@ -125,7 +142,7 @@ impl ParquetObjectReader {
125142
{
126143
match &self.runtime {
127144
Some(handle) => {
128-
let path = self.meta.location.clone();
145+
let path = self.path.clone();
129146
let store = Arc::clone(&self.store);
130147
handle
131148
.spawn(async move { f(&store, &path).await })
@@ -138,13 +155,27 @@ impl ParquetObjectReader {
138155
)
139156
.boxed()
140157
}
141-
None => f(&self.store, &self.meta.location)
142-
.map_err(|e| e.into())
143-
.boxed(),
158+
None => f(&self.store, &self.path).map_err(|e| e.into()).boxed(),
144159
}
145160
}
146161
}
147162

163+
impl MetadataSuffixFetch for &mut ParquetObjectReader {
164+
fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
165+
let options = GetOptions {
166+
range: Some(GetRange::Suffix(suffix)),
167+
..Default::default()
168+
};
169+
self.spawn(|store, path| {
170+
async move {
171+
let resp = store.get_opts(path, options).await?;
172+
Ok::<_, ParquetError>(resp.bytes().await?)
173+
}
174+
.boxed()
175+
})
176+
}
177+
}
178+
148179
impl AsyncFileReader for ParquetObjectReader {
149180
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
150181
self.spawn(|store, path| store.get_range(path, range))
@@ -165,13 +196,16 @@ impl AsyncFileReader for ParquetObjectReader {
165196
// `Self::get_bytes`.
166197
fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
167198
Box::pin(async move {
168-
let file_size = self.meta.size;
169-
let metadata = ParquetMetaDataReader::new()
199+
let metadata_reader = ParquetMetaDataReader::new()
170200
.with_column_indexes(self.preload_column_index)
171201
.with_offset_indexes(self.preload_offset_index)
172-
.with_prefetch_hint(self.metadata_size_hint)
173-
.load_and_finish(self, file_size)
174-
.await?;
202+
.with_prefetch_hint(self.metadata_size_hint);
203+
let metadata = if let Some(file_size) = self.file_size {
204+
metadata_reader.load_and_finish(self, file_size).await?
205+
} else {
206+
metadata_reader.load_via_suffix_and_finish(self).await?
207+
};
208+
175209
Ok(Arc::new(metadata))
176210
})
177211
}
@@ -181,7 +215,6 @@ impl AsyncFileReader for ParquetObjectReader {
181215
options: &'a ArrowReaderOptions,
182216
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
183217
Box::pin(async move {
184-
let file_size = self.meta.size;
185218
let metadata = ParquetMetaDataReader::new()
186219
.with_column_indexes(self.preload_column_index)
187220
.with_offset_indexes(self.preload_offset_index)
@@ -191,7 +224,11 @@ impl AsyncFileReader for ParquetObjectReader {
191224
let metadata =
192225
metadata.with_decryption_properties(options.file_decryption_properties.as_ref());
193226

194-
let metadata = metadata.load_and_finish(self, file_size).await?;
227+
let metadata = if let Some(file_size) = self.file_size {
228+
metadata.load_and_finish(self, file_size).await?
229+
} else {
230+
metadata.load_via_suffix_and_finish(self).await?
231+
};
195232

196233
Ok(Arc::new(metadata))
197234
})
@@ -231,7 +268,22 @@ mod tests {
231268
#[tokio::test]
232269
async fn test_simple() {
233270
let (meta, store) = get_meta_store().await;
234-
let object_reader = ParquetObjectReader::new(store, meta);
271+
let object_reader =
272+
ParquetObjectReader::new(store, meta.location).with_file_size(meta.size);
273+
274+
let builder = ParquetRecordBatchStreamBuilder::new(object_reader)
275+
.await
276+
.unwrap();
277+
let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();
278+
279+
assert_eq!(batches.len(), 1);
280+
assert_eq!(batches[0].num_rows(), 8);
281+
}
282+
283+
#[tokio::test]
284+
async fn test_simple_without_file_length() {
285+
let (meta, store) = get_meta_store().await;
286+
let object_reader = ParquetObjectReader::new(store, meta.location);
235287

236288
let builder = ParquetRecordBatchStreamBuilder::new(object_reader)
237289
.await
@@ -247,7 +299,8 @@ mod tests {
247299
let (mut meta, store) = get_meta_store().await;
248300
meta.location = Path::from("I don't exist.parquet");
249301

250-
let object_reader = ParquetObjectReader::new(store, meta);
302+
let object_reader =
303+
ParquetObjectReader::new(store, meta.location).with_file_size(meta.size);
251304
// Cannot use unwrap_err as ParquetRecordBatchStreamBuilder: !Debug
252305
match ParquetRecordBatchStreamBuilder::new(object_reader).await {
253306
Ok(_) => panic!("expected failure"),
@@ -280,7 +333,9 @@ mod tests {
280333

281334
let initial_actions = num_actions.load(Ordering::Relaxed);
282335

283-
let reader = ParquetObjectReader::new(store, meta).with_runtime(rt.handle().clone());
336+
let reader = ParquetObjectReader::new(store, meta.location)
337+
.with_file_size(meta.size)
338+
.with_runtime(rt.handle().clone());
284339

285340
let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
286341
let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();
@@ -306,7 +361,9 @@ mod tests {
306361

307362
let (meta, store) = get_meta_store().await;
308363

309-
let reader = ParquetObjectReader::new(store, meta).with_runtime(rt.handle().clone());
364+
let reader = ParquetObjectReader::new(store, meta.location)
365+
.with_file_size(meta.size)
366+
.with_runtime(rt.handle().clone());
310367

311368
let current_id = std::thread::current().id();
312369

@@ -329,7 +386,9 @@ mod tests {
329386

330387
let (meta, store) = get_meta_store().await;
331388

332-
let mut reader = ParquetObjectReader::new(store, meta).with_runtime(rt.handle().clone());
389+
let mut reader = ParquetObjectReader::new(store, meta.location)
390+
.with_file_size(meta.size)
391+
.with_runtime(rt.handle().clone());
333392

334393
rt.shutdown_background();
335394

0 commit comments

Comments
 (0)