Skip to content

Commit 3293a8c

Browse files
authored
Deprecate MetadataLoader (#6474)
* deprecate MetadataLoader * change signature of the load functions * fix up fetch_parquet_metadata * can now use self.meta.size directly * revert changes to load API * revert change to test code
1 parent f0e39cc commit 3293a8c

File tree

6 files changed

+60
-54
lines changed

6 files changed

+60
-54
lines changed

parquet/src/arrow/async_reader/metadata.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ impl<F: MetadataFetch> MetadataLoader<F> {
5252
/// Create a new [`MetadataLoader`] by reading the footer information
5353
///
5454
/// See [`fetch_parquet_metadata`] for the meaning of the individual parameters
55+
#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
5556
pub async fn load(mut fetch: F, file_size: usize, prefetch: Option<usize>) -> Result<Self> {
5657
if file_size < FOOTER_SIZE {
5758
return Err(ParquetError::EOF(format!(
@@ -108,6 +109,7 @@ impl<F: MetadataFetch> MetadataLoader<F> {
108109
}
109110

110111
/// Create a new [`MetadataLoader`] from an existing [`ParquetMetaData`]
112+
#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
111113
pub fn new(fetch: F, metadata: ParquetMetaData) -> Self {
112114
Self {
113115
fetch,
@@ -120,6 +122,7 @@ impl<F: MetadataFetch> MetadataLoader<F> {
120122
///
121123
/// * `column_index`: if true will load column index
122124
/// * `offset_index`: if true will load offset index
125+
#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
123126
pub async fn load_page_index(&mut self, column_index: bool, offset_index: bool) -> Result<()> {
124127
if !column_index && !offset_index {
125128
return Ok(());
@@ -226,6 +229,7 @@ where
226229
/// in the first request, instead of 8, and only issue further requests
227230
/// if additional bytes are needed. Providing a `prefetch` hint can therefore
228231
/// significantly reduce the number of `fetch` requests, and consequently latency
232+
#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
229233
pub async fn fetch_parquet_metadata<F, Fut>(
230234
fetch: F,
231235
file_size: usize,
@@ -236,10 +240,14 @@ where
236240
Fut: Future<Output = Result<Bytes>> + Send,
237241
{
238242
let fetch = MetadataFetchFn(fetch);
239-
let loader = MetadataLoader::load(fetch, file_size, prefetch).await?;
240-
Ok(loader.finish())
243+
ParquetMetaDataReader::new()
244+
.with_prefetch_hint(prefetch)
245+
.load_and_finish(fetch, file_size)
246+
.await
241247
}
242248

249+
// these tests are all replicated in parquet::file::metadata::reader
250+
#[allow(deprecated)]
243251
#[cfg(test)]
244252
mod tests {
245253
use super::*;

parquet/src/arrow/async_reader/mod.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -212,16 +212,18 @@ impl ArrowReaderMetadata {
212212
input: &mut T,
213213
options: ArrowReaderOptions,
214214
) -> Result<Self> {
215+
// TODO: this is all rather awkward. It would be nice if AsyncFileReader::get_metadata
216+
// took an argument to fetch the page indexes.
215217
let mut metadata = input.get_metadata().await?;
216218

217219
if options.page_index
218220
&& metadata.column_index().is_none()
219221
&& metadata.offset_index().is_none()
220222
{
221223
let m = Arc::try_unwrap(metadata).unwrap_or_else(|e| e.as_ref().clone());
222-
let mut loader = MetadataLoader::new(input, m);
223-
loader.load_page_index(true, true).await?;
224-
metadata = Arc::new(loader.finish())
224+
let mut reader = ParquetMetaDataReader::new_with_metadata(m).with_page_indexes(true);
225+
reader.load_page_index(input).await?;
226+
metadata = Arc::new(reader.finish()?)
225227
}
226228
Self::try_new(metadata, options)
227229
}

parquet/src/arrow/async_reader/store.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ use futures::{FutureExt, TryFutureExt};
2424

2525
use object_store::{ObjectMeta, ObjectStore};
2626

27-
use crate::arrow::async_reader::{AsyncFileReader, MetadataLoader};
27+
use crate::arrow::async_reader::AsyncFileReader;
2828
use crate::errors::Result;
29-
use crate::file::metadata::ParquetMetaData;
29+
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
3030

3131
/// Reads Parquet files in object storage using [`ObjectStore`].
3232
///
@@ -124,15 +124,14 @@ impl AsyncFileReader for ParquetObjectReader {
124124

125125
fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
126126
Box::pin(async move {
127-
let preload_column_index = self.preload_column_index;
128-
let preload_offset_index = self.preload_offset_index;
129127
let file_size = self.meta.size;
130-
let prefetch = self.metadata_size_hint;
131-
let mut loader = MetadataLoader::load(self, file_size, prefetch).await?;
132-
loader
133-
.load_page_index(preload_column_index, preload_offset_index)
128+
let metadata = ParquetMetaDataReader::new()
129+
.with_column_indexes(self.preload_column_index)
130+
.with_offset_indexes(self.preload_offset_index)
131+
.with_prefetch_hint(self.metadata_size_hint)
132+
.load_and_finish(self, file_size)
134133
.await?;
135-
Ok(Arc::new(loader.finish()))
134+
Ok(Arc::new(metadata))
136135
})
137136
}
138137
}

parquet/src/file/metadata/reader.rs

Lines changed: 22 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -329,13 +329,18 @@ impl ParquetMetaDataReader {
329329
return Ok(());
330330
}
331331

332-
self.load_page_index(fetch, remainder).await
332+
self.load_page_index_with_remainder(fetch, remainder).await
333333
}
334334

335335
/// Asynchronously fetch the page index structures when a [`ParquetMetaData`] has already
336336
/// been obtained. See [`Self::new_with_metadata()`].
337337
#[cfg(feature = "async")]
338-
pub async fn load_page_index<F: MetadataFetch>(
338+
pub async fn load_page_index<F: MetadataFetch>(&mut self, fetch: F) -> Result<()> {
339+
self.load_page_index_with_remainder(fetch, None).await
340+
}
341+
342+
#[cfg(feature = "async")]
343+
async fn load_page_index_with_remainder<F: MetadataFetch>(
339344
&mut self,
340345
mut fetch: F,
341346
remainder: Option<(usize, Bytes)>,
@@ -836,7 +841,7 @@ mod async_tests {
836841

837842
struct MetadataFetchFn<F>(F);
838843

839-
impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
844+
impl<'a, F, Fut> MetadataFetch for &'a mut MetadataFetchFn<F>
840845
where
841846
F: FnMut(Range<usize>) -> Fut + Send,
842847
Fut: Future<Output = Result<Bytes>> + Send,
@@ -865,74 +870,68 @@ mod async_tests {
865870
let expected = expected.file_metadata().schema();
866871
let fetch_count = AtomicUsize::new(0);
867872

868-
let mut fetch = |range| {
873+
let fetch = |range| {
869874
fetch_count.fetch_add(1, Ordering::SeqCst);
870875
futures::future::ready(read_range(&mut file, range))
871876
};
872877

873-
let input = MetadataFetchFn(&mut fetch);
878+
let mut f = MetadataFetchFn(fetch);
874879
let actual = ParquetMetaDataReader::new()
875-
.load_and_finish(input, len)
880+
.load_and_finish(&mut f, len)
876881
.await
877882
.unwrap();
878883
assert_eq!(actual.file_metadata().schema(), expected);
879884
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
880885

881886
// Metadata hint too small - below footer size
882887
fetch_count.store(0, Ordering::SeqCst);
883-
let input = MetadataFetchFn(&mut fetch);
884888
let actual = ParquetMetaDataReader::new()
885889
.with_prefetch_hint(Some(7))
886-
.load_and_finish(input, len)
890+
.load_and_finish(&mut f, len)
887891
.await
888892
.unwrap();
889893
assert_eq!(actual.file_metadata().schema(), expected);
890894
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
891895

892896
// Metadata hint too small
893897
fetch_count.store(0, Ordering::SeqCst);
894-
let input = MetadataFetchFn(&mut fetch);
895898
let actual = ParquetMetaDataReader::new()
896899
.with_prefetch_hint(Some(10))
897-
.load_and_finish(input, len)
900+
.load_and_finish(&mut f, len)
898901
.await
899902
.unwrap();
900903
assert_eq!(actual.file_metadata().schema(), expected);
901904
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
902905

903906
// Metadata hint too large
904907
fetch_count.store(0, Ordering::SeqCst);
905-
let input = MetadataFetchFn(&mut fetch);
906908
let actual = ParquetMetaDataReader::new()
907909
.with_prefetch_hint(Some(500))
908-
.load_and_finish(input, len)
910+
.load_and_finish(&mut f, len)
909911
.await
910912
.unwrap();
911913
assert_eq!(actual.file_metadata().schema(), expected);
912914
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
913915

914916
// Metadata hint exactly correct
915917
fetch_count.store(0, Ordering::SeqCst);
916-
let input = MetadataFetchFn(&mut fetch);
917918
let actual = ParquetMetaDataReader::new()
918919
.with_prefetch_hint(Some(428))
919-
.load_and_finish(input, len)
920+
.load_and_finish(&mut f, len)
920921
.await
921922
.unwrap();
922923
assert_eq!(actual.file_metadata().schema(), expected);
923924
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
924925

925-
let input = MetadataFetchFn(&mut fetch);
926926
let err = ParquetMetaDataReader::new()
927-
.load_and_finish(input, 4)
927+
.load_and_finish(&mut f, 4)
928928
.await
929929
.unwrap_err()
930930
.to_string();
931931
assert_eq!(err, "EOF: file size of 4 is less than footer");
932932

933-
let input = MetadataFetchFn(&mut fetch);
934933
let err = ParquetMetaDataReader::new()
935-
.load_and_finish(input, 20)
934+
.load_and_finish(&mut f, 20)
936935
.await
937936
.unwrap_err()
938937
.to_string();
@@ -949,42 +948,39 @@ mod async_tests {
949948
futures::future::ready(read_range(&mut file, range))
950949
};
951950

952-
let f = MetadataFetchFn(&mut fetch);
951+
let mut f = MetadataFetchFn(&mut fetch);
953952
let mut loader = ParquetMetaDataReader::new().with_page_indexes(true);
954-
loader.try_load(f, len).await.unwrap();
953+
loader.try_load(&mut f, len).await.unwrap();
955954
assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
956955
let metadata = loader.finish().unwrap();
957956
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
958957

959958
// Prefetch just footer exactly
960959
fetch_count.store(0, Ordering::SeqCst);
961-
let f = MetadataFetchFn(&mut fetch);
962960
let mut loader = ParquetMetaDataReader::new()
963961
.with_page_indexes(true)
964962
.with_prefetch_hint(Some(1729));
965-
loader.try_load(f, len).await.unwrap();
963+
loader.try_load(&mut f, len).await.unwrap();
966964
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
967965
let metadata = loader.finish().unwrap();
968966
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
969967

970968
// Prefetch more than footer but not enough
971969
fetch_count.store(0, Ordering::SeqCst);
972-
let f = MetadataFetchFn(&mut fetch);
973970
let mut loader = ParquetMetaDataReader::new()
974971
.with_page_indexes(true)
975972
.with_prefetch_hint(Some(130649));
976-
loader.try_load(f, len).await.unwrap();
973+
loader.try_load(&mut f, len).await.unwrap();
977974
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
978975
let metadata = loader.finish().unwrap();
979976
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
980977

981978
// Prefetch exactly enough
982979
fetch_count.store(0, Ordering::SeqCst);
983-
let f = MetadataFetchFn(&mut fetch);
984980
let metadata = ParquetMetaDataReader::new()
985981
.with_page_indexes(true)
986982
.with_prefetch_hint(Some(130650))
987-
.load_and_finish(f, len)
983+
.load_and_finish(&mut f, len)
988984
.await
989985
.unwrap();
990986
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);

parquet/src/file/metadata/writer.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -516,7 +516,7 @@ mod tests {
516516
/// Temporary function so we can test loading metadata with page indexes
517517
/// while we haven't fully figured out how to load it cleanly
518518
async fn load_metadata_from_bytes(file_size: usize, data: Bytes) -> ParquetMetaData {
519-
use crate::arrow::async_reader::{MetadataFetch, MetadataLoader};
519+
use crate::arrow::async_reader::MetadataFetch;
520520
use crate::errors::Result as ParquetResult;
521521
use futures::future::BoxFuture;
522522
use futures::FutureExt;
@@ -569,13 +569,11 @@ mod tests {
569569
Box::new(AsyncBytes::new(data)),
570570
file_size - metadata_length..file_size,
571571
);
572-
let metadata = MetadataLoader::load(&mut reader, file_size, None)
572+
ParquetMetaDataReader::new()
573+
.with_page_indexes(true)
574+
.load_and_finish(&mut reader, file_size)
573575
.await
574-
.unwrap();
575-
let loaded_metadata = metadata.finish();
576-
let mut metadata = MetadataLoader::new(&mut reader, loaded_metadata);
577-
metadata.load_page_index(true, true).await.unwrap();
578-
metadata.finish()
576+
.unwrap()
579577
}
580578

581579
fn check_columns_are_equivalent(left: &ColumnChunkMetaData, right: &ColumnChunkMetaData) {

parquet/tests/arrow_reader/bad_data.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -140,20 +140,23 @@ fn read_file(name: &str) -> Result<usize, ParquetError> {
140140
#[tokio::test]
141141
async fn bad_metadata_err() {
142142
use bytes::Bytes;
143-
use parquet::arrow::async_reader::MetadataLoader;
143+
use parquet::file::metadata::ParquetMetaDataReader;
144144

145145
let metadata_buffer = Bytes::from_static(include_bytes!("bad_raw_metadata.bin"));
146146

147147
let metadata_length = metadata_buffer.len();
148148

149149
let mut reader = std::io::Cursor::new(&metadata_buffer);
150-
let mut loader = MetadataLoader::load(&mut reader, metadata_length, None)
151-
.await
152-
.unwrap();
153-
loader.load_page_index(false, false).await.unwrap();
154-
loader.load_page_index(false, true).await.unwrap();
150+
let mut loader = ParquetMetaDataReader::new();
151+
loader.try_load(&mut reader, metadata_length).await.unwrap();
152+
loader = loader.with_page_indexes(false);
153+
loader.load_page_index(&mut reader).await.unwrap();
155154

156-
let err = loader.load_page_index(true, false).await.unwrap_err();
155+
loader = loader.with_offset_indexes(true);
156+
loader.load_page_index(&mut reader).await.unwrap();
157+
158+
loader = loader.with_column_indexes(true);
159+
let err = loader.load_page_index(&mut reader).await.unwrap_err();
157160

158161
assert_eq!(
159162
err.to_string(),

0 commit comments

Comments
 (0)