Skip to content

Commit 3a45ae9

Browse files
corwinjoyadamreeve
andauthored
Remove AsyncFileReader::get_metadata_with_options, add options to AsyncFileReader::get_metadata (#7342)
* Remove default implementation for get_metadata_with_options and explain why. * Improve comment about missing implementation. * Update get_metadata method to take an optional ArrowReaderOptions. * Fix handling of unencrypted footer. * Checking git tests, it seems I missed a couple dead calls. Fixed. * Fix path with no encryption enabled. * Make have_decryptor a bit more bullet-proof. * Update expected error message in test_decrypting_without_decryption_properties_fails. * Tidy up get_metadata * Fix merge conflicts. --------- Co-authored-by: Adam Reeve <[email protected]>
1 parent 5b44d67 commit 3a45ae9

File tree

6 files changed

+28
-85
lines changed

6 files changed

+28
-85
lines changed

parquet/examples/read_with_rowgroup.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ async fn main() -> Result<()> {
3535
let mut file = File::open(&path).await.unwrap();
3636

3737
// The metadata could be cached in other places, this example only shows how to read
38-
let metadata = file.get_metadata().await?;
38+
let metadata = file.get_metadata(None).await?;
3939

4040
for rg in metadata.row_groups() {
4141
let mut rowgroup = InMemoryRowGroup::create(rg.clone(), ProjectionMask::all());

parquet/src/arrow/async_reader/mod.rs

+16-59
Original file line numberDiff line numberDiff line change
@@ -101,20 +101,11 @@ pub trait AsyncFileReader: Send {
101101
/// Provides asynchronous access to the [`ParquetMetaData`] of a parquet file,
102102
/// allowing fine-grained control over how metadata is sourced, in particular allowing
103103
/// for caching, pre-fetching, catalog metadata, etc...
104-
fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>>;
105-
106-
/// Provides asynchronous access to the [`ParquetMetaData`] of a parquet file,
107-
/// allowing fine-grained control over how metadata is sourced, in particular allowing
108-
/// for caching, pre-fetching, catalog metadata, decrypting, etc...
109-
///
110-
/// By default calls `get_metadata()`
111-
fn get_metadata_with_options<'a>(
104+
/// ArrowReaderOptions may be provided to supply decryption parameters
105+
fn get_metadata<'a>(
112106
&'a mut self,
113-
options: &'a ArrowReaderOptions,
114-
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
115-
let _ = options;
116-
self.get_metadata()
117-
}
107+
options: Option<&'a ArrowReaderOptions>,
108+
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>>;
118109
}
119110

120111
/// This allows Box<dyn AsyncFileReader + '_> to be used as an AsyncFileReader,
@@ -127,15 +118,11 @@ impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
127118
self.as_mut().get_byte_ranges(ranges)
128119
}
129120

130-
fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
131-
self.as_mut().get_metadata()
132-
}
133-
134-
fn get_metadata_with_options<'a>(
121+
fn get_metadata<'a>(
135122
&'a mut self,
136-
options: &'a ArrowReaderOptions,
123+
options: Option<&'a ArrowReaderOptions>,
137124
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
138-
self.as_mut().get_metadata_with_options(options)
125+
self.as_mut().get_metadata(options)
139126
}
140127
}
141128

@@ -156,9 +143,9 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
156143
.boxed()
157144
}
158145

159-
fn get_metadata_with_options<'a>(
146+
fn get_metadata<'a>(
160147
&'a mut self,
161-
options: &'a ArrowReaderOptions,
148+
options: Option<&'a ArrowReaderOptions>,
162149
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
163150
const FOOTER_SIZE_I64: i64 = FOOTER_SIZE as i64;
164151
async move {
@@ -169,6 +156,7 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
169156

170157
let footer = ParquetMetaDataReader::decode_footer_tail(&buf)?;
171158
let metadata_len = footer.metadata_length();
159+
172160
self.seek(SeekFrom::End(-FOOTER_SIZE_I64 - metadata_len as i64))
173161
.await?;
174162

@@ -178,43 +166,16 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
178166
let metadata_reader = ParquetMetaDataReader::new();
179167

180168
#[cfg(feature = "encryption")]
181-
let metadata_reader = metadata_reader
182-
.with_decryption_properties(options.file_decryption_properties.as_ref());
169+
let metadata_reader = metadata_reader.with_decryption_properties(
170+
options.and_then(|o| o.file_decryption_properties.as_ref()),
171+
);
183172

184173
let parquet_metadata = metadata_reader.decode_footer_metadata(&buf, &footer)?;
185174

186175
Ok(Arc::new(parquet_metadata))
187176
}
188177
.boxed()
189178
}
190-
191-
fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
192-
const FOOTER_SIZE_I64: i64 = FOOTER_SIZE as i64;
193-
async move {
194-
self.seek(SeekFrom::End(-FOOTER_SIZE_I64)).await?;
195-
196-
let mut buf = [0_u8; FOOTER_SIZE];
197-
self.read_exact(&mut buf).await?;
198-
199-
let footer = ParquetMetaDataReader::decode_footer_tail(&buf)?;
200-
let metadata_len = footer.metadata_length();
201-
202-
if footer.is_encrypted_footer() {
203-
return Err(general_err!(
204-
"Parquet file has an encrypted footer but decryption properties were not provided"
205-
));
206-
}
207-
208-
self.seek(SeekFrom::End(-FOOTER_SIZE_I64 - metadata_len as i64))
209-
.await?;
210-
211-
let mut buf = Vec::with_capacity(metadata_len);
212-
self.take(metadata_len as _).read_to_end(&mut buf).await?;
213-
214-
Ok(Arc::new(ParquetMetaDataReader::decode_metadata(&buf)?))
215-
}
216-
.boxed()
217-
}
218179
}
219180

220181
impl ArrowReaderMetadata {
@@ -233,7 +194,7 @@ impl ArrowReaderMetadata {
233194
) -> Result<Self> {
234195
// TODO: this is all rather awkward. It would be nice if AsyncFileReader::get_metadata
235196
// took an argument to fetch the page indexes.
236-
let mut metadata = input.get_metadata_with_options(&options).await?;
197+
let mut metadata = input.get_metadata(Some(&options)).await?;
237198

238199
if options.page_index
239200
&& metadata.column_index().is_none()
@@ -1169,13 +1130,9 @@ mod tests {
11691130
futures::future::ready(Ok(self.data.slice(range))).boxed()
11701131
}
11711132

1172-
fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
1173-
futures::future::ready(Ok(self.metadata.clone())).boxed()
1174-
}
1175-
1176-
fn get_metadata_with_options<'a>(
1133+
fn get_metadata<'a>(
11771134
&'a mut self,
1178-
_options: &'a ArrowReaderOptions,
1135+
_options: Option<&'a ArrowReaderOptions>,
11791136
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
11801137
futures::future::ready(Ok(self.metadata.clone())).boxed()
11811138
}

parquet/src/arrow/async_reader/store.rs

+7-21
Original file line numberDiff line numberDiff line change
@@ -194,35 +194,21 @@ impl AsyncFileReader for ParquetObjectReader {
194194
// an `impl MetadataFetch` and calls those methods to get data from it. Due to `Self`'s impl of
195195
// `AsyncFileReader`, the calls to `MetadataFetch::fetch` are just delegated to
196196
// `Self::get_bytes`.
197-
fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
198-
Box::pin(async move {
199-
let metadata_reader = ParquetMetaDataReader::new()
200-
.with_column_indexes(self.preload_column_index)
201-
.with_offset_indexes(self.preload_offset_index)
202-
.with_prefetch_hint(self.metadata_size_hint);
203-
let metadata = if let Some(file_size) = self.file_size {
204-
metadata_reader.load_and_finish(self, file_size).await?
205-
} else {
206-
metadata_reader.load_via_suffix_and_finish(self).await?
207-
};
208-
209-
Ok(Arc::new(metadata))
210-
})
211-
}
212-
213-
fn get_metadata_with_options<'a>(
197+
fn get_metadata<'a>(
214198
&'a mut self,
215-
options: &'a ArrowReaderOptions,
199+
options: Option<&'a ArrowReaderOptions>,
216200
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
217201
Box::pin(async move {
218-
let metadata = ParquetMetaDataReader::new()
202+
let mut metadata = ParquetMetaDataReader::new()
219203
.with_column_indexes(self.preload_column_index)
220204
.with_offset_indexes(self.preload_offset_index)
221205
.with_prefetch_hint(self.metadata_size_hint);
222206

223207
#[cfg(feature = "encryption")]
224-
let metadata =
225-
metadata.with_decryption_properties(options.file_decryption_properties.as_ref());
208+
if let Some(options) = options {
209+
metadata = metadata
210+
.with_decryption_properties(options.file_decryption_properties.as_ref());
211+
}
226212

227213
let metadata = if let Some(file_size) = self.file_size {
228214
metadata.load_and_finish(self, file_size).await?

parquet/src/file/metadata/reader.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -834,7 +834,7 @@ impl ParquetMetaDataReader {
834834

835835
file_decryptor = Some(decryptor);
836836
} else {
837-
return Err(general_err!("Parquet file has an encrypted footer but no decryption properties were provided"));
837+
return Err(general_err!("Parquet file has an encrypted footer but decryption properties were not provided"));
838838
}
839839
}
840840

parquet/tests/arrow_reader/encryption.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ fn test_decrypting_without_decryption_properties_fails() {
154154
assert!(result.is_err());
155155
assert_eq!(
156156
result.unwrap_err().to_string(),
157-
"Parquet error: Parquet file has an encrypted footer but no decryption properties were provided"
157+
"Parquet error: Parquet file has an encrypted footer but decryption properties were not provided"
158158
);
159159
}
160160

parquet/tests/arrow_reader/encryption_async.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ async fn test_decrypting_without_decryption_properties_fails() {
239239
assert!(result.is_err());
240240
assert_eq!(
241241
result.unwrap_err().to_string(),
242-
"Parquet error: Parquet file has an encrypted footer but no decryption properties were provided"
242+
"Parquet error: Parquet file has an encrypted footer but decryption properties were not provided"
243243
);
244244
}
245245

@@ -277,7 +277,7 @@ async fn test_read_encrypted_file_from_object_store() {
277277
let options = ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties);
278278

279279
let mut reader = ParquetObjectReader::new(store, meta.location).with_file_size(meta.size);
280-
let metadata = reader.get_metadata_with_options(&options).await.unwrap();
280+
let metadata = reader.get_metadata(Some(&options)).await.unwrap();
281281
let builder = ParquetRecordBatchStreamBuilder::new_with_options(reader, options)
282282
.await
283283
.unwrap();

0 commit comments

Comments
 (0)