Skip to content

Commit e1d54e4

Browse files
committed
Example of reading and writing parquet metadata outside the file
1 parent b9cf3c5 commit e1d54e4

File tree

5 files changed

+284
-2
lines changed

5 files changed

+284
-2
lines changed

parquet/Cargo.toml

+6
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,12 @@ sysinfo = ["dep:sysinfo"]
121121
# Verify 32-bit CRC checksum when decoding parquet pages
122122
crc = ["dep:crc32fast"]
123123

124+
125+
[[example]]
126+
name = "external_metadata"
127+
required-features = ["arrow", "async"]
128+
path = "./examples/external_metadata.rs"
129+
124130
[[example]]
125131
name = "read_parquet"
126132
required-features = ["arrow"]

parquet/examples/external_metadata.rs

+221
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
19+
use arrow_cast::pretty::pretty_format_batches;
20+
use futures::TryStreamExt;
21+
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
22+
use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder};
23+
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter};
24+
use parquet::file::properties::{EnabledStatistics, WriterProperties};
25+
use std::fs::File;
26+
use std::path::{Path, PathBuf};
27+
use std::sync::Arc;
28+
use tempfile::TempDir;
29+
30+
/// This example demonstrates advanced usage of the Parquet metadata APIs.
31+
///
32+
/// For example, you can copy the metadata for parquet files stored on remote
33+
/// object storage (e.g. S3) to a local file or an in-memory cache, use a query
34+
/// engine like DataFusion to analyze the metadata to determine which file to
35+
/// read, and then read any matching files with a single object store request.
36+
///
37+
/// # Usecase
38+
///
39+
/// 1. Read Parquet metadata from Parquet file stored on a remote source
40+
///
41+
/// 2. Store that metadata somewhere "locally" that is faster to access than
42+
/// the rest of a parquet data (e.g. a cache).
43+
///
44+
/// 3. Use the metadata to determine of the file should be read, and if so, read
45+
/// the remote Parquet file, without re-reading / reparsing the metadata footer.
46+
///
47+
/// # Example Overview
48+
/// 1. Reads the metadata of a Parquet file
49+
///
50+
/// 2. Removes some column statistics from the metadata (to make them smaller)
51+
///
52+
/// 3. Stores the metadata in a separate file
53+
///
54+
/// 4. Reads the metadata from the separate file and uses that to read the
55+
/// Parquet file
56+
#[tokio::main(flavor = "current_thread")]
57+
async fn main() -> parquet::errors::Result<()> {
58+
let tempdir = TempDir::new().unwrap();
59+
println!("data in {tempdir:?}");
60+
let parquet_path = create_parquet_file(&tempdir);
61+
let metadata_path = tempdir.path().join("thrift_metadata.dat");
62+
63+
// In this example, we use a tokio file to mimic an async remote data source
64+
let mut remote_parquet_file = tokio::fs::File::open(&parquet_path).await?;
65+
66+
let metadata = get_metadata_from_remote_parquet_file(&mut remote_parquet_file).await;
67+
println!(
68+
"Metadata from 'remote' Parquet file into memory: {} bytes",
69+
metadata.memory_size()
70+
);
71+
72+
// now slim down the metadata and write it to a "local" file
73+
let metadata = prepare_metadata(metadata);
74+
write_metadata_to_local_file(metadata, &metadata_path);
75+
76+
// now read the metadata from the local file and use it to read the "remote" Parquet file
77+
let metadata = read_metadata_from_local_file(&metadata_path);
78+
println!("Read metadata from file: {metadata:#?}");
79+
80+
let batches = read_remote_parquet_file_with_metadata(remote_parquet_file, metadata).await;
81+
82+
// display the results
83+
let batches_string = pretty_format_batches(&batches).unwrap().to_string();
84+
let batches_lines: Vec<_> = batches_string.split('\n').collect();
85+
86+
assert_eq!(
87+
batches_lines,
88+
[
89+
"+-----+-------------+",
90+
"| id | description |",
91+
"+-----+-------------+",
92+
"| 100 | oranges |",
93+
"| 200 | apples |",
94+
"| 201 | grapefruit |",
95+
"| 300 | bannanas |",
96+
"| 102 | grapes |",
97+
"| 33 | pears |",
98+
"+-----+-------------+",
99+
],
100+
"actual output:\n\n{batches_lines:#?}"
101+
);
102+
103+
Ok(())
104+
}
105+
106+
/// Reads the metadata from a "remote" parquet file
107+
///
108+
/// Note that this function models reading from a remote file source using a
109+
/// tokio file. In a real application, you would implement [`MetadataFetch`] for
110+
/// your own remote source.
111+
///
112+
/// [`MetadataFetch`]: parquet::arrow::async_reader::MetadataFetch
113+
async fn get_metadata_from_remote_parquet_file(
114+
remote_file: &mut tokio::fs::File,
115+
) -> ParquetMetaData {
116+
// the remote source must know the total file size (e.g. from an object store LIST operation)
117+
let file_size = remote_file.metadata().await.unwrap().len();
118+
119+
// tell the reader to read the page index
120+
ParquetMetaDataReader::new()
121+
.with_page_indexes(true)
122+
.load_and_finish(remote_file, file_size as usize)
123+
.await
124+
.unwrap()
125+
}
126+
127+
/// modifies the metadata to reduce its size
128+
fn prepare_metadata(metadata: ParquetMetaData) -> ParquetMetaData {
129+
// TODO
130+
metadata
131+
}
132+
133+
/// writes the metadata to a file
134+
///
135+
/// The data is stored using the same thrift format as the Parquet file metadata
136+
fn write_metadata_to_local_file(metadata: ParquetMetaData, file: impl AsRef<Path>) {
137+
let file = File::create(file).unwrap();
138+
ParquetMetaDataWriter::new(file, &metadata)
139+
.finish()
140+
.unwrap()
141+
}
142+
143+
/// Reads the metadata from a file
144+
///
145+
/// This function reads the format written by `write_metadata_to_file`
146+
fn read_metadata_from_local_file(file: impl AsRef<Path>) -> ParquetMetaData {
147+
let file = File::open(file).unwrap();
148+
ParquetMetaDataReader::new()
149+
.with_column_indexes(true)
150+
.with_offset_indexes(true)
151+
.parse_and_finish(&file)
152+
.unwrap()
153+
}
154+
155+
/// Reads the "remote" Parquet file using the metadata
156+
///
157+
/// This shows how to read the Parquet file using previously read metadata
158+
/// instead of the metadata in the Parquet file itself. This avoids an IO /
159+
/// having to fetch and decode the metadata from the Parquet file before
160+
/// beginning to read it.
161+
///
162+
/// Note that this function models reading from a remote file source using a
163+
/// tokio file. In a real application, you would implement [`AsyncFileReader`]
164+
/// for your own remote source.
165+
///
166+
/// In this example, we simply read the results as Arrow record batches
167+
///
168+
/// [`AsyncFileReader`]: parquet::arrow::async_reader::AsyncFileReader
169+
async fn read_remote_parquet_file_with_metadata(
170+
remote_file: tokio::fs::File,
171+
metadata: ParquetMetaData,
172+
) -> Vec<RecordBatch> {
173+
let options = ArrowReaderOptions::new()
174+
// tell the reader to read the page index
175+
.with_page_index(true);
176+
// create a reader with pre-existing metadata
177+
let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
178+
let reader =
179+
ParquetRecordBatchStreamBuilder::new_with_metadata(remote_file, arrow_reader_metadata)
180+
.build()
181+
.unwrap();
182+
183+
reader.try_collect::<Vec<_>>().await.unwrap()
184+
}
185+
186+
/// Make a new parquet file in the temporary directory, and returns the path
187+
fn create_parquet_file(tmpdir: &TempDir) -> PathBuf {
188+
let path = tmpdir.path().join("example.parquet");
189+
let new_file = File::create(&path).unwrap();
190+
191+
let batch = RecordBatch::try_from_iter(vec![
192+
(
193+
"id",
194+
Arc::new(Int32Array::from(vec![100, 200, 201, 300, 102, 33])) as ArrayRef,
195+
),
196+
(
197+
"description",
198+
Arc::new(StringArray::from(vec![
199+
"oranges",
200+
"apples",
201+
"grapefruit",
202+
"bannanas",
203+
"grapes",
204+
"pears",
205+
])),
206+
),
207+
])
208+
.unwrap();
209+
210+
let props = WriterProperties::builder()
211+
// ensure we write the page index level statistics
212+
.set_statistics_enabled(EnabledStatistics::Page)
213+
.build();
214+
215+
let mut writer = ArrowWriter::try_new(new_file, batch.schema(), Some(props)).unwrap();
216+
217+
writer.write(&batch).unwrap();
218+
writer.finish().unwrap();
219+
220+
path
221+
}

parquet/src/arrow/async_reader/metadata.rs

+37-1
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,44 @@ use std::future::Future;
2828
use std::ops::Range;
2929

3030
/// A data source that can be used with [`MetadataLoader`] to load [`ParquetMetaData`]
31+
///
32+
/// Note that implementation is are provided for [`AsyncFileReader`].
33+
///
34+
/// # Example `MetadataFetch` for a custom async data source
35+
///
36+
/// ```rust
37+
/// # use parquet::errors::Result;
38+
/// # use parquet::arrow::async_reader::MetadataFetch;
39+
/// # use bytes::Bytes;
40+
/// # use std::ops::Range;
41+
/// # use std::io::SeekFrom;
42+
/// # use futures::future::BoxFuture;
43+
/// # use futures::FutureExt;
44+
/// # use tokio::io::{AsyncReadExt, AsyncSeekExt};
45+
/// // Adapter that implements the API for reading bytes from an async source (in
46+
/// // this case a tokio::fs::File)
47+
/// struct TokioFileMetadata {
48+
/// file: tokio::fs::File,
49+
/// }
50+
/// impl MetadataFetch for TokioFileMetadata {
51+
/// fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
52+
/// // return a future that fetches data in range
53+
/// async move {
54+
/// let mut buf = vec![0; range.end - range.start]; // target buffer
55+
/// // seek to the start of the range and read the data
56+
/// self.file.seek(SeekFrom::Start(range.start as u64)).await?;
57+
/// self.file.read_exact(&mut buf).await?;
58+
/// Ok(Bytes::from(buf)) // convert to Bytes
59+
/// }
60+
/// .boxed() // turn into BoxedFuture, using FutureExt::boxed
61+
/// }
62+
/// }
63+
///```
3164
pub trait MetadataFetch {
32-
/// Fetches a range of bytes asynchronously
65+
/// Return a future that fetches the specified range of bytes asynchronously
66+
///
67+
/// Note the returned type is a boxed future, often created by
68+
/// [FutureExt::boxed]. See the trait documentation for an example
3369
fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
3470
}
3571

parquet/src/arrow/async_reader/mod.rs

+12
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,18 @@ use crate::arrow::schema::ParquetField;
121121
pub use store::*;
122122

123123
/// The asynchronous interface used by [`ParquetRecordBatchStream`] to read parquet files
124+
///
125+
/// Notes:
126+
///
127+
/// 1. There is a default implementation for types that implement [`AsyncRead`]
128+
/// and [`AsyncSeek`], for example [`tokio::fs::File`].
129+
///
130+
/// 2. [`ParquetObjectReader`], available when the `object_store` crate feature
131+
/// is activated, implements this interface for [`ObjectStore`].
132+
///
133+
/// [`ObjectStore`]: object_store::ObjectStore
134+
///
135+
/// [`tokio::fs::File`]: https://docs.rs/tokio/latest/tokio/fs/struct.File.html
124136
pub trait AsyncFileReader: Send {
125137
/// Retrieve the bytes in `range`
126138
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;

parquet/src/file/metadata/reader.rs

+8-1
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,18 @@ use crate::arrow::async_reader::MetadataFetch;
3939
/// See [`crate::file::metadata::ParquetMetaDataWriter#output-format`] for a description of
4040
/// the Parquet metadata.
4141
///
42+
/// Parquet metadata is not necessarily contiguous in the files: part is stored
43+
/// in the footer (the last bytes of the file), but other portions (such as the
44+
/// PageIndex) can be stored elsewhere.
45+
///
46+
/// This reader handles reading the footer as well as the non contiguous parts
47+
/// of the metadata such as the page indexes.
48+
///
4249
/// # Example
4350
/// ```no_run
4451
/// # use parquet::file::metadata::ParquetMetaDataReader;
4552
/// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
46-
/// // read parquet metadata including page indexes
53+
/// // read parquet metadata including page indexes from a file
4754
/// let file = open_parquet_file("some_path.parquet");
4855
/// let mut reader = ParquetMetaDataReader::new()
4956
/// .with_page_indexes(true);

0 commit comments

Comments
 (0)