diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 253625117c2b..f548ea1e8c5e 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -34,9 +34,7 @@ use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField}; use crate::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask}; use crate::column::page::{PageIterator, PageReader}; use crate::errors::{ParquetError, Result}; -use crate::file::footer; -use crate::file::metadata::ParquetMetaData; -use crate::file::page_index::index_reader; +use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; use crate::file::reader::{ChunkReader, SerializedPageReader}; use crate::schema::types::SchemaDescriptor; @@ -382,23 +380,9 @@ impl ArrowReaderMetadata { /// `Self::metadata` is missing the page index, this function will attempt /// to load the page index by making an object store request. pub fn load(reader: &T, options: ArrowReaderOptions) -> Result { - let mut metadata = footer::parse_metadata(reader)?; - if options.page_index { - let column_index = metadata - .row_groups() - .iter() - .map(|rg| index_reader::read_columns_indexes(reader, rg.columns())) - .collect::>>()?; - metadata.set_column_index(Some(column_index)); - - let offset_index = metadata - .row_groups() - .iter() - .map(|rg| index_reader::read_offset_indexes(reader, rg.columns())) - .collect::>>()?; - - metadata.set_offset_index(Some(offset_index)) - } + let metadata = ParquetMetaDataReader::new() + .with_page_indexes(options.page_index) + .parse(reader)?; Self::try_new(Arc::new(metadata), options) } @@ -3496,9 +3480,8 @@ mod tests { ArrowReaderOptions::new().with_page_index(true), ) .unwrap(); - // Although `Vec>` of each row group is empty, - // we should read the file successfully. - assert!(builder.metadata().offset_index().unwrap()[0].is_empty()); + // Absent page indexes should not be initialized, and file should still be readable. + assert!(builder.metadata().offset_index().is_none()); let reader = builder.build().unwrap(); let batches = reader.collect::, _>>().unwrap(); assert_eq!(batches.len(), 1); diff --git a/parquet/src/arrow/async_reader/metadata.rs b/parquet/src/arrow/async_reader/metadata.rs index b4bf77f2608d..90d54263b0c1 100644 --- a/parquet/src/arrow/async_reader/metadata.rs +++ b/parquet/src/arrow/async_reader/metadata.rs @@ -17,8 +17,7 @@ use crate::arrow::async_reader::AsyncFileReader; use crate::errors::{ParquetError, Result}; -use crate::file::footer::{decode_footer, decode_metadata}; -use crate::file::metadata::ParquetMetaData; +use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; use crate::file::page_index::index::Index; use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index}; use crate::file::FOOTER_SIZE; @@ -76,13 +75,13 @@ impl MetadataLoader { let mut footer = [0; FOOTER_SIZE]; footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]); - let length = decode_footer(&footer)?; + let length = ParquetMetaDataReader::decode_footer(&footer)?; if file_size < length + FOOTER_SIZE { return Err(ParquetError::EOF(format!( "file size of {} is less than footer + metadata {}", file_size, - length + 8 + length + FOOTER_SIZE ))); } @@ -90,13 +89,13 @@ impl MetadataLoader { let (metadata, remainder) = if length > suffix_len - FOOTER_SIZE { let metadata_start = file_size - length - FOOTER_SIZE; let meta = fetch.fetch(metadata_start..file_size - FOOTER_SIZE).await?; - (decode_metadata(&meta)?, None) + (ParquetMetaDataReader::decode_metadata(&meta)?, None) } else { let metadata_start = file_size - length - FOOTER_SIZE - footer_start; let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE]; ( - decode_metadata(slice)?, + ParquetMetaDataReader::decode_metadata(slice)?, Some((footer_start, suffix.slice(..metadata_start))), ) }; @@ -237,8 +236,9 @@ where Fut: Future> + Send, { let fetch = MetadataFetchFn(fetch); - let loader = MetadataLoader::load(fetch, file_size, prefetch).await?; - Ok(loader.finish()) + let mut reader = ParquetMetaDataReader::new().with_prefetch_hint(prefetch); + reader.try_load(fetch, file_size).await?; + reader.finish() } #[cfg(test)] @@ -332,41 +332,43 @@ mod tests { }; let f = MetadataFetchFn(&mut fetch); - let mut loader = MetadataLoader::load(f, len, None).await.unwrap(); - assert_eq!(fetch_count.load(Ordering::SeqCst), 2); - loader.load_page_index(true, true).await.unwrap(); + let mut loader = ParquetMetaDataReader::new().with_page_indexes(true); + loader.try_load(f, len).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 3); - let metadata = loader.finish(); + let metadata = loader.finish().unwrap(); assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); // Prefetch just footer exactly fetch_count.store(0, Ordering::SeqCst); let f = MetadataFetchFn(&mut fetch); - let mut loader = MetadataLoader::load(f, len, Some(1729)).await.unwrap(); - assert_eq!(fetch_count.load(Ordering::SeqCst), 1); - loader.load_page_index(true, true).await.unwrap(); + let mut loader = ParquetMetaDataReader::new() + .with_page_indexes(true) + .with_prefetch_hint(Some(1729)); + loader.try_load(f, len).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 2); - let metadata = loader.finish(); + let metadata = loader.finish().unwrap(); assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); // Prefetch more than footer but not enough fetch_count.store(0, Ordering::SeqCst); let f = MetadataFetchFn(&mut fetch); - let mut loader = MetadataLoader::load(f, len, Some(130649)).await.unwrap(); - assert_eq!(fetch_count.load(Ordering::SeqCst), 1); - loader.load_page_index(true, true).await.unwrap(); + let mut loader = ParquetMetaDataReader::new() + .with_page_indexes(true) + .with_prefetch_hint(Some(130649)); + loader.try_load(f, len).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 2); - let metadata = loader.finish(); + let metadata = loader.finish().unwrap(); assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); // Prefetch exactly enough fetch_count.store(0, Ordering::SeqCst); let f = MetadataFetchFn(&mut fetch); - let mut loader = MetadataLoader::load(f, len, Some(130650)).await.unwrap(); - assert_eq!(fetch_count.load(Ordering::SeqCst), 1); - loader.load_page_index(true, true).await.unwrap(); + let mut loader = ParquetMetaDataReader::new() + .with_page_indexes(true) + .with_prefetch_hint(Some(130650)); + loader.try_load(f, len).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 1); - let metadata = loader.finish(); + let metadata = loader.finish().unwrap(); assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); } } diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 5695dbc10fe1..84b129ffc11d 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -104,8 +104,7 @@ use crate::bloom_filter::{ }; use crate::column::page::{PageIterator, PageReader}; use crate::errors::{ParquetError, Result}; -use crate::file::footer::{decode_footer, decode_metadata}; -use crate::file::metadata::{ParquetMetaData, RowGroupMetaData}; +use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData}; use crate::file::page_index::offset_index::OffsetIndexMetaData; use crate::file::reader::{ChunkReader, Length, SerializedPageReader}; use crate::file::FOOTER_SIZE; @@ -186,14 +185,14 @@ impl AsyncFileReader for T { let mut buf = [0_u8; FOOTER_SIZE]; self.read_exact(&mut buf).await?; - let metadata_len = decode_footer(&buf)?; + let metadata_len = ParquetMetaDataReader::decode_footer(&buf)?; self.seek(SeekFrom::End(-FOOTER_SIZE_I64 - metadata_len as i64)) .await?; let mut buf = Vec::with_capacity(metadata_len); self.take(metadata_len as _).read_to_end(&mut buf).await?; - Ok(Arc::new(decode_metadata(&buf)?)) + Ok(Arc::new(ParquetMetaDataReader::decode_metadata(&buf)?)) } .boxed() } @@ -220,9 +219,9 @@ impl ArrowReaderMetadata { && metadata.offset_index().is_none() { let m = Arc::try_unwrap(metadata).unwrap_or_else(|e| e.as_ref().clone()); - let mut loader = MetadataLoader::new(input, m); - loader.load_page_index(true, true).await?; - metadata = Arc::new(loader.finish()) + let mut reader = ParquetMetaDataReader::new_with_metadata(m).with_page_indexes(true); + reader.load_page_index(input, None).await?; + metadata = Arc::new(reader.finish()?) } Self::try_new(metadata, options) } @@ -909,7 +908,6 @@ mod tests { }; use crate::arrow::schema::parquet_to_arrow_schema_and_fields; use crate::arrow::ArrowWriter; - use crate::file::footer::parse_metadata; use crate::file::page_index::index_reader; use crate::file::properties::WriterProperties; use arrow::compute::kernels::cmp::eq; @@ -946,13 +944,17 @@ mod tests { } } + fn get_parquet_metadata(data: &Bytes) -> ParquetMetaData { + ParquetMetaDataReader::new().parse(data).unwrap() + } + #[tokio::test] async fn test_async_reader() { let testdata = arrow::util::test_util::parquet_test_data(); let path = format!("{testdata}/alltypes_plain.parquet"); let data = Bytes::from(std::fs::read(path).unwrap()); - let metadata = parse_metadata(&data).unwrap(); + let metadata = get_parquet_metadata(&data); let metadata = Arc::new(metadata); assert_eq!(metadata.num_row_groups(), 1); @@ -1007,7 +1009,7 @@ mod tests { let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet"); let data = Bytes::from(std::fs::read(path).unwrap()); - let metadata = parse_metadata(&data).unwrap(); + let metadata = get_parquet_metadata(&data); let metadata = Arc::new(metadata); assert_eq!(metadata.num_row_groups(), 1); @@ -1073,7 +1075,7 @@ mod tests { let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet"); let data = Bytes::from(std::fs::read(path).unwrap()); - let metadata = parse_metadata(&data).unwrap(); + let metadata = get_parquet_metadata(&data); let metadata = Arc::new(metadata); assert_eq!(metadata.num_row_groups(), 1); @@ -1117,7 +1119,7 @@ mod tests { let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet"); let data = Bytes::from(std::fs::read(path).unwrap()); - let metadata = parse_metadata(&data).unwrap(); + let metadata = get_parquet_metadata(&data); let metadata = Arc::new(metadata); assert_eq!(metadata.num_row_groups(), 1); @@ -1173,7 +1175,7 @@ mod tests { let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet"); let data = Bytes::from(std::fs::read(path).unwrap()); - let metadata = parse_metadata(&data).unwrap(); + let metadata = get_parquet_metadata(&data); let metadata = Arc::new(metadata); assert_eq!(metadata.num_row_groups(), 1); @@ -1238,7 +1240,7 @@ mod tests { let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet"); let data = Bytes::from(std::fs::read(path).unwrap()); - let metadata = parse_metadata(&data).unwrap(); + let metadata = get_parquet_metadata(&data); let metadata = Arc::new(metadata); assert_eq!(metadata.num_row_groups(), 1); @@ -1317,7 +1319,7 @@ mod tests { writer.close().unwrap(); let data: Bytes = buf.into(); - let metadata = parse_metadata(&data).unwrap(); + let metadata = get_parquet_metadata(&data); let parquet_schema = metadata.file_metadata().schema_descr_ptr(); let test = TestReader { @@ -1391,7 +1393,7 @@ mod tests { writer.close().unwrap(); let data: Bytes = buf.into(); - let metadata = parse_metadata(&data).unwrap(); + let metadata = get_parquet_metadata(&data); assert_eq!(metadata.num_row_groups(), 2); @@ -1479,7 +1481,7 @@ mod tests { let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet"); let data = Bytes::from(std::fs::read(path).unwrap()); - let metadata = parse_metadata(&data).unwrap(); + let metadata = get_parquet_metadata(&data); let parquet_schema = metadata.file_metadata().schema_descr_ptr(); let metadata = Arc::new(metadata); @@ -1529,7 +1531,7 @@ mod tests { let path = format!("{testdata}/alltypes_tiny_pages.parquet"); let data = Bytes::from(std::fs::read(path).unwrap()); - let metadata = parse_metadata(&data).unwrap(); + let metadata = get_parquet_metadata(&data); let offset_index = index_reader::read_offset_indexes(&data, metadata.row_group(0).columns()) @@ -1619,7 +1621,7 @@ mod tests { let path = format!("{testdata}/alltypes_plain.parquet"); let data = Bytes::from(std::fs::read(path).unwrap()); - let metadata = parse_metadata(&data).unwrap(); + let metadata = get_parquet_metadata(&data); let file_rows = metadata.file_metadata().num_rows() as usize; let metadata = Arc::new(metadata); @@ -1764,7 +1766,7 @@ mod tests { let testdata = arrow::util::test_util::parquet_test_data(); let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet"); let data = Bytes::from(std::fs::read(path).unwrap()); - let metadata = parse_metadata(&data).unwrap(); + let metadata = get_parquet_metadata(&data); let metadata = Arc::new(metadata); let async_reader = TestReader { data: data.clone(), @@ -1793,7 +1795,7 @@ mod tests { } async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) { - let metadata = parse_metadata(&data).unwrap(); + let metadata = get_parquet_metadata(&data); let metadata = Arc::new(metadata); assert_eq!(metadata.num_row_groups(), 1); @@ -1933,7 +1935,7 @@ mod tests { writer.close().unwrap(); let data: Bytes = buf.into(); - let metadata = parse_metadata(&data).unwrap(); + let metadata = get_parquet_metadata(&data); let parquet_schema = metadata.file_metadata().schema_descr_ptr(); let test = TestReader { diff --git a/parquet/src/arrow/async_reader/store.rs b/parquet/src/arrow/async_reader/store.rs index 77c00e91a3aa..321f8cd07632 100644 --- a/parquet/src/arrow/async_reader/store.rs +++ b/parquet/src/arrow/async_reader/store.rs @@ -24,9 +24,9 @@ use futures::{FutureExt, TryFutureExt}; use object_store::{ObjectMeta, ObjectStore}; -use crate::arrow::async_reader::{AsyncFileReader, MetadataLoader}; +use crate::arrow::async_reader::AsyncFileReader; use crate::errors::Result; -use crate::file::metadata::ParquetMetaData; +use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; /// Reads Parquet files in object storage using [`ObjectStore`]. /// @@ -124,15 +124,13 @@ impl AsyncFileReader for ParquetObjectReader { fn get_metadata(&mut self) -> BoxFuture<'_, Result>> { Box::pin(async move { - let preload_column_index = self.preload_column_index; - let preload_offset_index = self.preload_offset_index; + let mut reader = ParquetMetaDataReader::new() + .with_column_indexes(self.preload_column_index) + .with_offset_indexes(self.preload_offset_index) + .with_prefetch_hint(self.metadata_size_hint); let file_size = self.meta.size; - let prefetch = self.metadata_size_hint; - let mut loader = MetadataLoader::load(self, file_size, prefetch).await?; - loader - .load_page_index(preload_column_index, preload_offset_index) - .await?; - Ok(Arc::new(loader.finish())) + reader.try_load(self, file_size).await?; + Ok(Arc::new(reader.finish()?)) }) } } diff --git a/parquet/src/bin/parquet-concat.rs b/parquet/src/bin/parquet-concat.rs index 9cbdf8e7b399..cc878237bc4b 100644 --- a/parquet/src/bin/parquet-concat.rs +++ b/parquet/src/bin/parquet-concat.rs @@ -39,6 +39,7 @@ use clap::Parser; use parquet::column::writer::ColumnCloseResult; use parquet::errors::{ParquetError, Result}; +use parquet::file::metadata::ParquetMetaDataReader; use parquet::file::properties::WriterProperties; use parquet::file::writer::SerializedFileWriter; use std::fs::File; @@ -70,7 +71,7 @@ impl Args { .iter() .map(|x| { let reader = File::open(x)?; - let metadata = parquet::file::footer::parse_metadata(&reader)?; + let metadata = ParquetMetaDataReader::new().parse(&reader)?; Ok((reader, metadata)) }) .collect::>>()?; diff --git a/parquet/src/bin/parquet-layout.rs b/parquet/src/bin/parquet-layout.rs index 79a0acb5f57c..11e8e0ccfc0d 100644 --- a/parquet/src/bin/parquet-layout.rs +++ b/parquet/src/bin/parquet-layout.rs @@ -37,6 +37,7 @@ use std::fs::File; use std::io::Read; use clap::Parser; +use parquet::file::metadata::ParquetMetaDataReader; use serde::Serialize; use thrift::protocol::TCompactInputProtocol; @@ -79,7 +80,7 @@ struct Page { } fn do_layout(reader: &C) -> Result { - let metadata = parquet::file::footer::parse_metadata(reader)?; + let metadata = ParquetMetaDataReader::new().parse(reader)?; let schema = metadata.file_metadata().schema_descr(); let row_groups = (0..metadata.num_row_groups()) diff --git a/parquet/src/errors.rs b/parquet/src/errors.rs index a242c9768514..df515ef39492 100644 --- a/parquet/src/errors.rs +++ b/parquet/src/errors.rs @@ -45,6 +45,8 @@ pub enum ParquetError { IndexOutOfBound(usize, usize), /// An external error variant External(Box), + /// Returned when a function needs more data to complete properly. + NeedMoreData(usize), } impl std::fmt::Display for ParquetError { @@ -61,6 +63,7 @@ impl std::fmt::Display for ParquetError { write!(fmt, "Index {index} out of bound: {bound}") } ParquetError::External(e) => write!(fmt, "External: {e}"), + ParquetError::NeedMoreData(needed) => write!(fmt, "NeedMoreData: {needed}"), } } } diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index f3c73ee81a0e..768b17fd6920 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -94,6 +94,7 @@ //! * Same name, different struct //! ``` mod memory; +pub(crate) mod reader; mod writer; use std::ops::Range; @@ -115,6 +116,7 @@ use crate::schema::types::{ ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, SchemaDescriptor, Type as SchemaType, }; +pub use reader::ParquetMetaDataReader; pub use writer::ParquetMetaDataWriter; pub(crate) use writer::ThriftMetadataWriter; @@ -278,13 +280,11 @@ impl ParquetMetaData { } /// Override the column index - #[cfg(feature = "arrow")] pub(crate) fn set_column_index(&mut self, index: Option) { self.column_index = index; } /// Override the offset index - #[cfg(feature = "arrow")] pub(crate) fn set_offset_index(&mut self, index: Option) { self.offset_index = index; } diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs new file mode 100644 index 000000000000..103c22a5c14f --- /dev/null +++ b/parquet/src/file/metadata/reader.rs @@ -0,0 +1,914 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{io::Read, ops::Range, sync::Arc}; + +use bytes::Bytes; + +use crate::basic::ColumnOrder; +use crate::errors::{ParquetError, Result}; +use crate::file::metadata::{FileMetaData, ParquetMetaData, RowGroupMetaData}; +use crate::file::page_index::index::Index; +use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index}; +use crate::file::reader::ChunkReader; +use crate::file::{FOOTER_SIZE, PARQUET_MAGIC}; +use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData}; +use crate::schema::types; +use crate::schema::types::SchemaDescriptor; +use crate::thrift::{TCompactSliceInputProtocol, TSerializable}; + +#[cfg(feature = "async")] +use crate::arrow::async_reader::MetadataFetch; + +/// Reads the [`ParquetMetaData`] from a byte stream. +/// +/// See [`crate::file::metadata::ParquetMetaDataWriter#output-format`] for a description of +/// the Parquet metadata. +/// +/// # Example +/// ```no_run +/// # use parquet::file::metadata::ParquetMetaDataReader; +/// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); } +/// // read parquet metadata including page indexes +/// let file = open_parquet_file("some_path.parquet"); +/// let mut reader = ParquetMetaDataReader::new() +/// .with_page_indexes(true); +/// reader.try_parse(&file).unwrap(); +/// let metadata = reader.finish().unwrap(); +/// assert!(metadata.column_index().is_some()); +/// assert!(metadata.offset_index().is_some()); +/// ``` +#[derive(Default)] +pub struct ParquetMetaDataReader { + metadata: Option, + column_index: bool, + offset_index: bool, + prefetch_hint: Option, +} + +impl ParquetMetaDataReader { + /// Create a new [`ParquetMetaDataReader`] + pub fn new() -> Self { + Default::default() + } + + /// Create a new [`ParquetMetaDataReader`] populated with a [`ParquetMetaData`] struct + /// obtained via other means. Primarily intended for use with [`Self::load_page_index()`]. + pub fn new_with_metadata(metadata: ParquetMetaData) -> Self { + Self { + metadata: Some(metadata), + ..Default::default() + } + } + + /// Enable or disable reading the page index structures described in + /// "[Parquet page index]: Layout to Support Page Skipping". Equivalent to: + /// `self.with_column_indexes(val).with_offset_indexes(val)` + /// + /// [Parquet page index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md + pub fn with_page_indexes(self, val: bool) -> Self { + self.with_column_indexes(val).with_offset_indexes(val) + } + + /// Enable or disable reading the Parquet [ColumnIndex] structure. + /// + /// [ColumnIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md + pub fn with_column_indexes(mut self, val: bool) -> Self { + self.column_index = val; + self + } + + /// Enable or disable reading the Parquet [OffsetIndex] structure. + /// + /// [OffsetIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md + pub fn with_offset_indexes(mut self, val: bool) -> Self { + self.offset_index = val; + self + } + + /// Provide a hint as to the number of bytes needed to fully parse the [`ParquetMetaData`]. + /// Only used for the asynchronous [`Self::try_load()`] method. + /// + /// By default, the reader will first fetch the last 8 bytes of the input file to obtain the + /// size of the footer metadata. A second fetch will be performed to obtain the needed bytes. + /// After parsing the footer metadata, a third fetch will be performed to obtain the bytes + /// needed to decode the page index structures, if they have been requested. To avoid + /// unnecessary fetches, `prefetch` can be set to an estimate of the number of bytes needed + /// to fully decode the [`ParquetMetaData`], which can reduce the number of fetch requests and + /// reduce latency. Setting `prefetch` too small will not trigger an error, but will result + /// in extra fetches being performed. + pub fn with_prefetch_hint(mut self, prefetch: Option) -> Self { + self.prefetch_hint = prefetch; + self + } + + /// Return the parsed [`ParquetMetaData`] struct. + pub fn finish(&mut self) -> Result { + self.metadata + .take() + .ok_or_else(|| general_err!("could not parse parquet metadata")) + } + + /// Given a [`ChunkReader`], parse and return the [`ParquetMetaData`] in a single pass. + /// + /// If `reader` is [`Bytes`] based, then the buffer must contain sufficient bytes to complete + /// the request, and must include the Parquet footer. If page indexes are desired, the buffer + /// must contain the entire file, or [`Self::try_parse_sized()`] should be used. + /// + /// This call will consume `self`. + /// + /// Example + /// ```no_run + /// # use parquet::file::metadata::ParquetMetaDataReader; + /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); } + /// // read parquet metadata including page indexes + /// let file = open_parquet_file("some_path.parquet"); + /// let metadata = ParquetMetaDataReader::new() + /// .with_page_indexes(true) + /// .parse(&file).unwrap(); + /// ``` + pub fn parse(mut self, reader: &R) -> Result { + self.try_parse(reader)?; + self.finish() + } + + /// Attempts to parse the footer metadata (and optionally page indexes) given a [`ChunkReader`]. + /// If `reader` is [`Bytes`] based, then the buffer must contain sufficient bytes to complete + /// the request, and must include the Parquet footer. If page indexes are desired, the buffer + /// must contain the entire file, or [`Self::try_parse_sized()`] should be used. + pub fn try_parse(&mut self, reader: &R) -> Result<()> { + self.try_parse_sized(reader, reader.len() as usize) + } + + /// Same as [`Self::try_parse()`], but provide the original file size in the case that `reader` + /// is a [`Bytes`] struct that does not contain the entire file. This information is necessary + /// when the page indexes are desired. `reader` must have access to the Parquet footer. + /// + /// Using this function also allows for retrying with a larger buffer. + /// + /// Example + /// ```no_run + /// # use parquet::file::metadata::ParquetMetaDataReader; + /// # use parquet::errors::ParquetError; + /// # use crate::parquet::file::reader::Length; + /// # fn get_bytes(file: &std::fs::File, range: std::ops::Range) -> bytes::Bytes { unimplemented!(); } + /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); } + /// let file = open_parquet_file("some_path.parquet"); + /// let len = file.len() as usize; + /// let bytes = get_bytes(&file, 1000..len); + /// let mut reader = ParquetMetaDataReader::new().with_page_indexes(true); + /// match reader.try_parse_sized(&bytes, len) { + /// Ok(_) => (), + /// Err(ParquetError::NeedMoreData(needed)) => { + /// let bytes = get_bytes(&file, len - needed..len); + /// reader.try_parse_sized(&bytes, len).unwrap(); + /// } + /// _ => panic!("unexpected error") + /// } + /// let metadata = reader.finish().unwrap(); + /// ``` + pub fn try_parse_sized(&mut self, reader: &R, file_size: usize) -> Result<()> { + self.metadata = match Self::parse_metadata(reader) { + Ok(metadata) => Some(metadata), + Err(ParquetError::NeedMoreData(needed)) => { + // If reader is the same length as `file_size` then presumably there is no more to + // read, so return an EOF error. + if file_size == reader.len() as usize || needed > file_size { + return Err(eof_err!( + "Parquet file too small. Size is {} but need {}", + file_size, + needed + )); + } else { + // Ask for a larger buffer + return Err(ParquetError::NeedMoreData(needed)); + } + } + Err(e) => return Err(e), + }; + + // we can return if page indexes aren't requested + if !self.column_index && !self.offset_index { + return Ok(()); + } + + // TODO(ets): what is the correct behavior for missing page indexes? MetadataLoader would + // leave them as `None`, while the parser in `index_reader::read_columns_indexes` returns a + // vector of empty vectors. + // I think it's best to leave them as `None`. + + // Get bounds needed for page indexes (if any are present in the file). + let Some(range) = self.range_for_page_index() else { + return Ok(()); + }; + + // TODO(ets): it's kind of wasteful to return NeedMoreData and then re-parse the footer. + // Think about adding an equivalent to load_page_index. Or maybe just skip the footer + // parse if self.metadata.is_some(). + + // Check to see if needed range is within `file_range`. Checking `range.end` seems + // redundant, but it guards against `range_for_page_index()` returning garbage. + let file_range = file_size.saturating_sub(reader.len() as usize)..file_size; + if !(file_range.contains(&range.start) && file_range.contains(&range.end)) { + // Requested range starts beyond EOF + if range.end > file_size { + return Err(eof_err!( + "Parquet file too small. Range {:?} is beyond file bounds {file_size}", + range + )); + } else { + // Ask for a larger buffer + return Err(ParquetError::NeedMoreData(file_size - range.start)); + } + } + + let bytes_needed = range.end - range.start; + let bytes = reader.get_bytes((range.start - file_range.start) as u64, bytes_needed)?; + let offset = range.start; + + self.parse_column_index(&bytes, offset)?; + self.parse_offset_index(&bytes, offset)?; + + Ok(()) + } + + /// Given a [`MetadataFetch`], parse and return the [`ParquetMetaData`] in a single pass. + /// + /// This call will consume `self`. + /// + /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches + /// performed by this function. + #[cfg(feature = "async")] + pub async fn load( + mut self, + fetch: F, + file_size: usize, + ) -> Result { + self.try_load(fetch, file_size).await?; + self.finish() + } + + /// Attempts to (asynchronously) parse the footer metadata (and optionally page indexes) + /// given a [`MetadataFetch`]. + /// + /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches + /// performed by this function. + #[cfg(feature = "async")] + pub async fn try_load( + &mut self, + mut fetch: F, + file_size: usize, + ) -> Result<()> { + let (metadata, remainder) = + Self::load_metadata(&mut fetch, file_size, self.get_prefetch_size()).await?; + + self.metadata = Some(metadata); + + // we can return if page indexes aren't requested + if !self.column_index && !self.offset_index { + return Ok(()); + } + + self.load_page_index(fetch, remainder).await + } + + /// Asynchronously fetch the page index structures when a [`ParquetMetaData`] has already + /// been obtained. See [`Self::new_with_metadata()`]. + #[cfg(feature = "async")] + pub async fn load_page_index( + &mut self, + mut fetch: F, + remainder: Option<(usize, Bytes)>, + ) -> Result<()> { + // Get bounds needed for page indexes (if any are present in the file). + let range = self.range_for_page_index(); + let range = match range { + Some(range) => range, + None => return Ok(()), + }; + + let bytes = match &remainder { + Some((remainder_start, remainder)) if *remainder_start <= range.start => { + let offset = range.start - *remainder_start; + remainder.slice(offset..range.end - *remainder_start + offset) + } + // Note: this will potentially fetch data already in remainder, this keeps things simple + _ => fetch.fetch(range.start..range.end).await?, + }; + + // Sanity check + assert_eq!(bytes.len(), range.end - range.start); + let offset = range.start; + + self.parse_column_index(&bytes, offset)?; + self.parse_offset_index(&bytes, offset)?; + + Ok(()) + } + + // TODO(ets): should these go in `index_reader.rs`? + fn parse_column_index(&mut self, bytes: &Bytes, start_offset: usize) -> Result<()> { + let metadata = self.metadata.as_mut().unwrap(); + if self.column_index { + let index = metadata + .row_groups() + .iter() + .map(|x| { + x.columns() + .iter() + .map(|c| match c.column_index_range() { + Some(r) => decode_column_index( + &bytes[r.start - start_offset..r.end - start_offset], + c.column_type(), + ), + None => Ok(Index::NONE), + }) + .collect::>>() + }) + .collect::>>()?; + metadata.set_column_index(Some(index)); + } + Ok(()) + } + + fn parse_offset_index(&mut self, bytes: &Bytes, start_offset: usize) -> Result<()> { + let metadata = self.metadata.as_mut().unwrap(); + if self.offset_index { + let index = metadata + .row_groups() + .iter() + .map(|x| { + x.columns() + .iter() + .map(|c| match c.offset_index_range() { + Some(r) => decode_offset_index( + &bytes[r.start - start_offset..r.end - start_offset], + ), + None => Err(general_err!("missing offset index")), + }) + .collect::>>() + }) + .collect::>>()?; + + metadata.set_offset_index(Some(index)); + } + Ok(()) + } + + fn range_for_page_index(&self) -> Option> { + // sanity check + self.metadata.as_ref()?; + + // Get bounds needed for page indexes (if any are present in the file). + let mut range = None; + let metadata = self.metadata.as_ref().unwrap(); + for c in metadata.row_groups().iter().flat_map(|r| r.columns()) { + if self.column_index { + range = acc_range(range, c.column_index_range()); + } + if self.offset_index { + range = acc_range(range, c.offset_index_range()); + } + } + range + } + + // one-shot parse of footer + fn parse_metadata(chunk_reader: &R) -> Result { + // check file is large enough to hold footer + let file_size = chunk_reader.len(); + if file_size < (FOOTER_SIZE as u64) { + return Err(ParquetError::NeedMoreData(FOOTER_SIZE)); + } + + let mut footer = [0_u8; 8]; + chunk_reader + .get_read(file_size - 8)? + .read_exact(&mut footer)?; + + let metadata_len = Self::decode_footer(&footer)?; + let footer_metadata_len = FOOTER_SIZE + metadata_len; + + if footer_metadata_len > file_size as usize { + return Err(ParquetError::NeedMoreData(footer_metadata_len)); + } + + let start = file_size - footer_metadata_len as u64; + Self::decode_metadata(chunk_reader.get_bytes(start, metadata_len)?.as_ref()) + } + + /// Return the number of bytes to read in the initial pass. If `prefetch_size` has + /// been provided, then return that value if it is larger than the size of the Parquet + /// file footer (8 bytes). Otherwise returns `8`. + #[cfg(feature = "async")] + fn get_prefetch_size(&self) -> usize { + if let Some(prefetch) = self.prefetch_hint { + if prefetch > FOOTER_SIZE { + return prefetch; + } + } + FOOTER_SIZE + } + + #[cfg(feature = "async")] + async fn load_metadata( + fetch: &mut F, + file_size: usize, + prefetch: usize, + ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> { + if file_size < FOOTER_SIZE { + return Err(eof_err!("file size of {} is less than footer", file_size)); + } + + // If a size hint is provided, read more than the minimum size + // to try and avoid a second fetch. + // Note: prefetch > file_size is ok since we're using saturating_sub. + let footer_start = file_size.saturating_sub(prefetch); + + let suffix = fetch.fetch(footer_start..file_size).await?; + let suffix_len = suffix.len(); + let fetch_len = file_size - footer_start; + if suffix_len < fetch_len { + return Err(eof_err!( + "metadata requires {} bytes, but could only read {}", + fetch_len, + suffix_len + )); + } + + let mut footer = [0; FOOTER_SIZE]; + footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]); + + let length = Self::decode_footer(&footer)?; + + if file_size < length + FOOTER_SIZE { + return Err(eof_err!( + "file size of {} is less than footer + metadata {}", + file_size, + length + FOOTER_SIZE + )); + } + + // Did not fetch the entire file metadata in the initial read, need to make a second request + if length > suffix_len - FOOTER_SIZE { + let metadata_start = file_size - length - FOOTER_SIZE; + let meta = fetch.fetch(metadata_start..file_size - FOOTER_SIZE).await?; + Ok((Self::decode_metadata(&meta)?, None)) + } else { + let metadata_start = file_size - length - FOOTER_SIZE - footer_start; + let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE]; + Ok(( + Self::decode_metadata(slice)?, + Some((footer_start, suffix.slice(..metadata_start))), + )) + } + } + + /// Decodes the Parquet footer returning the metadata length in bytes + /// + /// A parquet footer is 8 bytes long and has the following layout: + /// * 4 bytes for the metadata length + /// * 4 bytes for the magic bytes 'PAR1' + /// + /// ```text + /// +-----+--------+ + /// | len | 'PAR1' | + /// +-----+--------+ + /// ``` + pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result { + // check this is indeed a parquet file + if slice[4..] != PARQUET_MAGIC { + return Err(general_err!("Invalid Parquet file. Corrupt footer")); + } + + // get the metadata length from the footer + let metadata_len = u32::from_le_bytes(slice[..4].try_into().unwrap()); + // u32 won't be larger than usize in most cases + Ok(metadata_len as usize) + } + + /// Decodes [`ParquetMetaData`] from the provided bytes. + /// + /// Typically this is used to decode the metadata from the end of a parquet + /// file. The format of `buf` is the Thift compact binary protocol, as specified + /// by the [Parquet Spec]. + /// + /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata + pub fn decode_metadata(buf: &[u8]) -> Result { + // TODO: row group filtering + let mut prot = TCompactSliceInputProtocol::new(buf); + let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot) + .map_err(|e| general_err!("Could not parse metadata: {}", e))?; + let schema = types::from_thrift(&t_file_metadata.schema)?; + let schema_descr = Arc::new(SchemaDescriptor::new(schema)); + let mut row_groups = Vec::new(); + for rg in t_file_metadata.row_groups { + row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?); + } + let column_orders = Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr); + + let file_metadata = FileMetaData::new( + t_file_metadata.version, + t_file_metadata.num_rows, + t_file_metadata.created_by, + t_file_metadata.key_value_metadata, + schema_descr, + column_orders, + ); + Ok(ParquetMetaData::new(file_metadata, row_groups)) + } + + /// Parses column orders from Thrift definition. + /// If no column orders are defined, returns `None`. + fn parse_column_orders( + t_column_orders: Option>, + schema_descr: &SchemaDescriptor, + ) -> Option> { + match t_column_orders { + Some(orders) => { + // Should always be the case + assert_eq!( + orders.len(), + schema_descr.num_columns(), + "Column order length mismatch" + ); + let mut res = Vec::new(); + for (i, column) in schema_descr.columns().iter().enumerate() { + match orders[i] { + TColumnOrder::TYPEORDER(_) => { + let sort_order = ColumnOrder::get_sort_order( + column.logical_type(), + column.converted_type(), + column.physical_type(), + ); + res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order)); + } + } + } + Some(res) + } + None => None, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use bytes::Bytes; + #[cfg(feature = "async")] + use futures::future::BoxFuture; + #[cfg(feature = "async")] + use futures::FutureExt; + use std::fs::File; + use std::future::Future; + use std::io::{Read, Seek, SeekFrom}; + use std::sync::atomic::{AtomicUsize, Ordering}; + + use crate::basic::SortOrder; + use crate::basic::Type; + use crate::file::reader::Length; + use crate::format::TypeDefinedOrder; + use crate::schema::types::Type as SchemaType; + use crate::util::test_common::file_util::get_test_file; + + #[test] + fn test_parse_metadata_size_smaller_than_footer() { + let test_file = tempfile::tempfile().unwrap(); + let err = ParquetMetaDataReader::parse_metadata(&test_file).unwrap_err(); + assert!(matches!(err, ParquetError::NeedMoreData(8))); + } + + #[test] + fn test_parse_metadata_corrupt_footer() { + let data = Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]); + let reader_result = ParquetMetaDataReader::parse_metadata(&data); + assert_eq!( + reader_result.unwrap_err().to_string(), + "Parquet error: Invalid Parquet file. Corrupt footer" + ); + } + + #[test] + fn test_parse_metadata_invalid_start() { + let test_file = Bytes::from(vec![255, 0, 0, 0, b'P', b'A', b'R', b'1']); + let err = ParquetMetaDataReader::parse_metadata(&test_file).unwrap_err(); + assert!(matches!(err, ParquetError::NeedMoreData(263))); + } + + #[test] + fn test_metadata_column_orders_parse() { + // Define simple schema, we do not need to provide logical types. + let fields = vec![ + Arc::new( + SchemaType::primitive_type_builder("col1", Type::INT32) + .build() + .unwrap(), + ), + Arc::new( + SchemaType::primitive_type_builder("col2", Type::FLOAT) + .build() + .unwrap(), + ), + ]; + let schema = SchemaType::group_type_builder("schema") + .with_fields(fields) + .build() + .unwrap(); + let schema_descr = SchemaDescriptor::new(Arc::new(schema)); + + let t_column_orders = Some(vec![ + TColumnOrder::TYPEORDER(TypeDefinedOrder::new()), + TColumnOrder::TYPEORDER(TypeDefinedOrder::new()), + ]); + + assert_eq!( + ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr), + Some(vec![ + ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED), + ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED) + ]) + ); + + // Test when no column orders are defined. + assert_eq!( + ParquetMetaDataReader::parse_column_orders(None, &schema_descr), + None + ); + } + + #[test] + #[should_panic(expected = "Column order length mismatch")] + fn test_metadata_column_orders_len_mismatch() { + let schema = SchemaType::group_type_builder("schema").build().unwrap(); + let schema_descr = SchemaDescriptor::new(Arc::new(schema)); + + let t_column_orders = Some(vec![TColumnOrder::TYPEORDER(TypeDefinedOrder::new())]); + + ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr); + } + + #[test] + fn test_try_parse() { + let file = get_test_file("alltypes_tiny_pages.parquet"); + let len = file.len() as usize; + + let mut reader = ParquetMetaDataReader::new().with_page_indexes(true); + + let bytes_for_range = |range: Range| { + file.get_bytes(range.start as u64, range.end - range.start) + .unwrap() + }; + + // read entire file + let bytes = bytes_for_range(0..len); + reader.try_parse(&bytes).unwrap(); + let metadata = reader.finish().unwrap(); + assert!(metadata.column_index.is_some()); + assert!(metadata.offset_index.is_some()); + + // read more than enough of file + let bytes = bytes_for_range(320000..len); + reader.try_parse_sized(&bytes, len).unwrap(); + let metadata = reader.finish().unwrap(); + assert!(metadata.column_index.is_some()); + assert!(metadata.offset_index.is_some()); + + // exactly enough + let bytes = bytes_for_range(323583..len); + reader.try_parse_sized(&bytes, len).unwrap(); + let metadata = reader.finish().unwrap(); + assert!(metadata.column_index.is_some()); + assert!(metadata.offset_index.is_some()); + + // not enough for page index + let bytes = bytes_for_range(323584..len); + // should fail + match reader.try_parse_sized(&bytes, len).unwrap_err() { + // expected error, try again with provided bounds + ParquetError::NeedMoreData(needed) => { + let bytes = bytes_for_range(len - needed..len); + reader.try_parse_sized(&bytes, len).unwrap(); + let metadata = reader.finish().unwrap(); + assert!(metadata.column_index.is_some()); + assert!(metadata.offset_index.is_some()); + } + _ => panic!("unexpected error"), + }; + + // not enough for page index but lie about file size + let bytes = bytes_for_range(323584..len); + let reader_result = reader.try_parse_sized(&bytes, len - 323584).unwrap_err(); + assert_eq!( + reader_result.to_string(), + "EOF: Parquet file too small. Range 323583..452504 is beyond file bounds 130649" + ); + + // not enough for file metadata + let mut reader = ParquetMetaDataReader::new(); + let bytes = bytes_for_range(452505..len); + // should fail + match reader.try_parse_sized(&bytes, len).unwrap_err() { + // expected error, try again with provided bounds + ParquetError::NeedMoreData(needed) => { + let bytes = bytes_for_range(len - needed..len); + reader.try_parse_sized(&bytes, len).unwrap(); + reader.finish().unwrap(); + } + _ => panic!("unexpected error"), + }; + + // not enough for file metadata but use try_parse() + let reader_result = reader.try_parse(&bytes).unwrap_err(); + assert_eq!( + reader_result.to_string(), + "EOF: Parquet file too small. Size is 1728 but need 1729" + ); + + // read head of file rather than tail + let bytes = bytes_for_range(0..1000); + let reader_result = reader.try_parse_sized(&bytes, len).unwrap_err(); + assert_eq!( + reader_result.to_string(), + "Parquet error: Invalid Parquet file. Corrupt footer" + ); + + // lie about file size + let bytes = bytes_for_range(452510..len); + let reader_result = reader.try_parse_sized(&bytes, len - 452505).unwrap_err(); + assert_eq!( + reader_result.to_string(), + "EOF: Parquet file too small. Size is 1728 but need 1729" + ); + } + + #[cfg(feature = "async")] + struct MetadataFetchFn(F); + + #[cfg(feature = "async")] + impl MetadataFetch for MetadataFetchFn + where + F: FnMut(Range) -> Fut + Send, + Fut: Future> + Send, + { + fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { + async move { self.0(range).await }.boxed() + } + } + + fn read_range(file: &mut File, range: Range) -> Result { + file.seek(SeekFrom::Start(range.start as _))?; + let len = range.end - range.start; + let mut buf = Vec::with_capacity(len); + file.take(len as _).read_to_end(&mut buf)?; + Ok(buf.into()) + } + + #[cfg(feature = "async")] + #[tokio::test] + async fn test_simple() { + let mut file = get_test_file("nulls.snappy.parquet"); + let len = file.len() as usize; + + let expected = ParquetMetaDataReader::new().parse(&file).unwrap(); + let expected = expected.file_metadata().schema(); + let fetch_count = AtomicUsize::new(0); + + let mut fetch = |range| { + fetch_count.fetch_add(1, Ordering::SeqCst); + futures::future::ready(read_range(&mut file, range)) + }; + + let input = MetadataFetchFn(&mut fetch); + let actual = ParquetMetaDataReader::new().load(input, len).await.unwrap(); + assert_eq!(actual.file_metadata().schema(), expected); + assert_eq!(fetch_count.load(Ordering::SeqCst), 2); + + // Metadata hint too small - below footer size + fetch_count.store(0, Ordering::SeqCst); + let input = MetadataFetchFn(&mut fetch); + let actual = ParquetMetaDataReader::new() + .with_prefetch_hint(Some(7)) + .load(input, len) + .await + .unwrap(); + assert_eq!(actual.file_metadata().schema(), expected); + assert_eq!(fetch_count.load(Ordering::SeqCst), 2); + + // Metadata hint too small + fetch_count.store(0, Ordering::SeqCst); + let input = MetadataFetchFn(&mut fetch); + let actual = ParquetMetaDataReader::new() + .with_prefetch_hint(Some(10)) + .load(input, len) + .await + .unwrap(); + assert_eq!(actual.file_metadata().schema(), expected); + assert_eq!(fetch_count.load(Ordering::SeqCst), 2); + + // Metadata hint too large + fetch_count.store(0, Ordering::SeqCst); + let input = MetadataFetchFn(&mut fetch); + let actual = ParquetMetaDataReader::new() + .with_prefetch_hint(Some(500)) + .load(input, len) + .await + .unwrap(); + assert_eq!(actual.file_metadata().schema(), expected); + assert_eq!(fetch_count.load(Ordering::SeqCst), 1); + + // Metadata hint exactly correct + fetch_count.store(0, Ordering::SeqCst); + let input = MetadataFetchFn(&mut fetch); + let actual = ParquetMetaDataReader::new() + .with_prefetch_hint(Some(428)) + .load(input, len) + .await + .unwrap(); + assert_eq!(actual.file_metadata().schema(), expected); + assert_eq!(fetch_count.load(Ordering::SeqCst), 1); + + let input = MetadataFetchFn(&mut fetch); + let err = ParquetMetaDataReader::new() + .load(input, 4) + .await + .unwrap_err() + .to_string(); + assert_eq!(err, "EOF: file size of 4 is less than footer"); + + let input = MetadataFetchFn(&mut fetch); + let err = ParquetMetaDataReader::new() + .load(input, 20) + .await + .unwrap_err() + .to_string(); + assert_eq!(err, "Parquet error: Invalid Parquet file. Corrupt footer"); + } + + #[cfg(feature = "async")] + #[tokio::test] + async fn test_page_index() { + let mut file = get_test_file("alltypes_tiny_pages.parquet"); + let len = file.len() as usize; + let fetch_count = AtomicUsize::new(0); + let mut fetch = |range| { + fetch_count.fetch_add(1, Ordering::SeqCst); + futures::future::ready(read_range(&mut file, range)) + }; + + let f = MetadataFetchFn(&mut fetch); + let mut loader = ParquetMetaDataReader::new().with_page_indexes(true); + loader.try_load(f, len).await.unwrap(); + assert_eq!(fetch_count.load(Ordering::SeqCst), 3); + let metadata = loader.finish().unwrap(); + assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); + + // Prefetch just footer exactly + fetch_count.store(0, Ordering::SeqCst); + let f = MetadataFetchFn(&mut fetch); + let mut loader = ParquetMetaDataReader::new() + .with_page_indexes(true) + .with_prefetch_hint(Some(1729)); + loader.try_load(f, len).await.unwrap(); + assert_eq!(fetch_count.load(Ordering::SeqCst), 2); + let metadata = loader.finish().unwrap(); + assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); + + // Prefetch more than footer but not enough + fetch_count.store(0, Ordering::SeqCst); + let f = MetadataFetchFn(&mut fetch); + let mut loader = ParquetMetaDataReader::new() + .with_page_indexes(true) + .with_prefetch_hint(Some(130649)); + loader.try_load(f, len).await.unwrap(); + assert_eq!(fetch_count.load(Ordering::SeqCst), 2); + let metadata = loader.finish().unwrap(); + assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); + + // Prefetch exactly enough + fetch_count.store(0, Ordering::SeqCst); + let f = MetadataFetchFn(&mut fetch); + let metadata = ParquetMetaDataReader::new() + .with_page_indexes(true) + .with_prefetch_hint(Some(130650)) + .load(f, len) + .await + .unwrap(); + assert_eq!(fetch_count.load(Ordering::SeqCst), 1); + assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); + } +} diff --git a/parquet/src/file/metadata/writer.rs b/parquet/src/file/metadata/writer.rs index 92ce60556c3e..e768b822318c 100644 --- a/parquet/src/file/metadata/writer.rs +++ b/parquet/src/file/metadata/writer.rs @@ -384,9 +384,9 @@ impl<'a, W: Write> ParquetMetaDataWriter<'a, W> { mod tests { use std::sync::Arc; - use crate::file::footer::parse_metadata; use crate::file::metadata::{ - ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataWriter, RowGroupMetaData, + ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter, + RowGroupMetaData, }; use crate::file::properties::{EnabledStatistics, WriterProperties}; use crate::file::reader::{FileReader, SerializedFileReader}; @@ -428,7 +428,7 @@ mod tests { let data = buf.into_inner().freeze(); - let decoded_metadata = parse_metadata(&data).unwrap(); + let decoded_metadata = ParquetMetaDataReader::new().parse(&data).unwrap(); assert!(!has_page_index(&metadata.metadata)); assert_eq!(metadata.metadata, decoded_metadata); @@ -514,7 +514,7 @@ mod tests { /// Temporary function so we can test loading metadata with page indexes /// while we haven't fully figured out how to load it cleanly async fn load_metadata_from_bytes(file_size: usize, data: Bytes) -> ParquetMetaData { - use crate::arrow::async_reader::{MetadataFetch, MetadataLoader}; + use crate::arrow::async_reader::MetadataFetch; use crate::errors::Result as ParquetResult; use futures::future::BoxFuture; use futures::FutureExt; @@ -567,13 +567,12 @@ mod tests { Box::new(AsyncBytes::new(data)), file_size - metadata_length..file_size, ); - let metadata = MetadataLoader::load(&mut reader, file_size, None) + let mut metadata_reader = ParquetMetaDataReader::new().with_page_indexes(true); + metadata_reader + .try_load(&mut reader, file_size) .await .unwrap(); - let loaded_metadata = metadata.finish(); - let mut metadata = MetadataLoader::new(&mut reader, loaded_metadata); - metadata.load_page_index(true, true).await.unwrap(); - metadata.finish() + metadata_reader.finish().unwrap() } fn check_columns_are_equivalent(left: &ColumnChunkMetaData, right: &ColumnChunkMetaData) { diff --git a/parquet/src/file/page_index/index_reader.rs b/parquet/src/file/page_index/index_reader.rs index 395e9afe122c..ca1e0677881b 100644 --- a/parquet/src/file/page_index/index_reader.rs +++ b/parquet/src/file/page_index/index_reader.rs @@ -59,6 +59,7 @@ pub fn read_columns_indexes( let fetch = match fetch { Some(r) => r, + // TODO(ets): should this return `Option>` to match MetadataLoader? None => return Ok(vec![Index::NONE; chunks.len()]), }; @@ -132,6 +133,7 @@ pub fn read_offset_indexes( let fetch = match fetch { Some(r) => r, + // TODO(ets): should this return `Option>` to match MetadataLoader? None => return Ok(vec![]), }; diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index b8ee4001a99c..99ebefe6e8ce 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -30,7 +30,6 @@ use crate::errors::{ParquetError, Result}; use crate::file::page_index::index_reader; use crate::file::page_index::offset_index::OffsetIndexMetaData; use crate::file::{ - footer, metadata::*, properties::{ReaderProperties, ReaderPropertiesPtr}, reader::*, @@ -180,7 +179,7 @@ impl SerializedFileReader { /// Creates file reader from a Parquet file. /// Returns error if Parquet file does not exist or is corrupt. pub fn new(chunk_reader: R) -> Result { - let metadata = footer::parse_metadata(&chunk_reader)?; + let metadata = ParquetMetaDataReader::new().parse(&chunk_reader)?; let props = Arc::new(ReaderProperties::builder().build()); Ok(Self { chunk_reader: Arc::new(chunk_reader), @@ -192,7 +191,7 @@ impl SerializedFileReader { /// Creates file reader from a Parquet file with read options. /// Returns error if Parquet file does not exist or is corrupt. pub fn new_with_options(chunk_reader: R, options: ReadOptions) -> Result { - let metadata = footer::parse_metadata(&chunk_reader)?; + let metadata = ParquetMetaDataReader::new().parse(&chunk_reader)?; let mut predicates = options.predicates; let row_groups = metadata.row_groups().to_vec(); let mut filtered_row_groups = Vec::::new(); diff --git a/parquet/tests/arrow_reader/bad_data.rs b/parquet/tests/arrow_reader/bad_data.rs index a73864070d9f..1093c82a295c 100644 --- a/parquet/tests/arrow_reader/bad_data.rs +++ b/parquet/tests/arrow_reader/bad_data.rs @@ -140,20 +140,23 @@ fn read_file(name: &str) -> Result { #[tokio::test] async fn bad_metadata_err() { use bytes::Bytes; - use parquet::arrow::async_reader::MetadataLoader; + use parquet::file::metadata::ParquetMetaDataReader; let metadata_buffer = Bytes::from_static(include_bytes!("bad_raw_metadata.bin")); let metadata_length = metadata_buffer.len(); let mut reader = std::io::Cursor::new(&metadata_buffer); - let mut loader = MetadataLoader::load(&mut reader, metadata_length, None) - .await - .unwrap(); - loader.load_page_index(false, false).await.unwrap(); - loader.load_page_index(false, true).await.unwrap(); + let mut loader = ParquetMetaDataReader::new(); + loader.try_load(&mut reader, metadata_length).await.unwrap(); + loader = loader.with_page_indexes(false); + loader.load_page_index(&mut reader, None).await.unwrap(); - let err = loader.load_page_index(true, false).await.unwrap_err(); + loader = loader.with_offset_indexes(true); + loader.load_page_index(&mut reader, None).await.unwrap(); + + loader = loader.with_column_indexes(true); + let err = loader.load_page_index(&mut reader, None).await.unwrap_err(); assert_eq!( err.to_string(),