Skip to content

Commit f73a96d

Browse files
committed
Example of reading and writing parquet metadata outside the file
1 parent 02d8b98 commit f73a96d

File tree

2 files changed

+243
-0
lines changed

2 files changed

+243
-0
lines changed

parquet/Cargo.toml

+6
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,12 @@ sysinfo = ["dep:sysinfo"]
121121
# Verify 32-bit CRC checksum when decoding parquet pages
122122
crc = ["dep:crc32fast"]
123123

124+
125+
[[example]]
126+
name = "external_metadata"
127+
required-features = ["arrow", "async"]
128+
path = "./examples/external_metadata.rs"
129+
124130
[[example]]
125131
name = "read_parquet"
126132
required-features = ["arrow"]

parquet/examples/external_metadata.rs

+237
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
19+
use arrow_cast::pretty::pretty_format_batches;
20+
use futures::TryStreamExt;
21+
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
22+
use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder};
23+
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter};
24+
use parquet::file::properties::{EnabledStatistics, WriterProperties};
25+
use std::fs::File;
26+
use std::path::{Path, PathBuf};
27+
use std::sync::Arc;
28+
use tempfile::TempDir;
29+
30+
/// This example demonstrates advanced usage of the Parquet metadata APIs.
31+
///
32+
/// It is sometimes desired to copy the metadata for parquet files stored on
33+
/// remote object storage (e.g. S3) to a local file or an in-memory cache, use a
34+
/// query engine like DataFusion to analyze the metadata to determine which file
35+
/// to read, and then read any matching files with a single subsequent object
36+
/// store request.
37+
///
38+
/// # Example Overview
39+
///
40+
/// 1. Reads the metadata of a Parquet file using [`ParquetMetaDataReader`]
41+
///
42+
/// 2. Removes column statistics from the metadata (to make it smaller)
43+
///
44+
/// 3. Stores the metadata in a separate file using [`ParquetMetaDataWriter`]
45+
///
46+
/// 4. Reads the metadata from the separate file and uses that to read the
47+
/// Parquet file, thus avoiding a second IO to read metadata or reparsing
48+
/// the footer.
49+
///
50+
#[tokio::main(flavor = "current_thread")]
51+
async fn main() -> parquet::errors::Result<()> {
52+
let tempdir = TempDir::new().unwrap();
53+
let parquet_path = create_parquet_file(&tempdir);
54+
let metadata_path = tempdir.path().join("thrift_metadata.dat");
55+
56+
// In this example, we use a tokio file to mimic an async remote data source
57+
let mut remote_parquet_file = tokio::fs::File::open(&parquet_path).await?;
58+
59+
let metadata = get_metadata_from_remote_parquet_file(&mut remote_parquet_file).await;
60+
println!(
61+
"Metadata from 'remote' Parquet file into memory: {} bytes",
62+
metadata.memory_size()
63+
);
64+
65+
// now slim down the metadata and write it to a "local" file
66+
let metadata = prepare_metadata(metadata);
67+
write_metadata_to_local_file(metadata, &metadata_path);
68+
69+
// now read the metadata from the local file and use it to read the "remote" Parquet file
70+
let metadata = read_metadata_from_local_file(&metadata_path);
71+
println!("Read metadata from file");
72+
73+
let batches = read_remote_parquet_file_with_metadata(remote_parquet_file, metadata).await;
74+
75+
// display the results
76+
let batches_string = pretty_format_batches(&batches).unwrap().to_string();
77+
let batches_lines: Vec<_> = batches_string.split('\n').collect();
78+
79+
assert_eq!(
80+
batches_lines,
81+
[
82+
"+-----+-------------+",
83+
"| id | description |",
84+
"+-----+-------------+",
85+
"| 100 | oranges |",
86+
"| 200 | apples |",
87+
"| 201 | grapefruit |",
88+
"| 300 | bannanas |",
89+
"| 102 | grapes |",
90+
"| 33 | pears |",
91+
"+-----+-------------+",
92+
],
93+
"actual output:\n\n{batches_lines:#?}"
94+
);
95+
96+
Ok(())
97+
}
98+
99+
/// Reads the metadata from a "remote" parquet file
100+
///
101+
/// Note that this function models reading from a remote file source using a
102+
/// tokio file. In a real application, you would implement [`MetadataFetch`] for
103+
/// your own remote source.
104+
///
105+
/// [`MetadataFetch`]: parquet::arrow::async_reader::MetadataFetch
106+
async fn get_metadata_from_remote_parquet_file(
107+
remote_file: &mut tokio::fs::File,
108+
) -> ParquetMetaData {
109+
// the remote source must know the total file size (e.g. from an object store LIST operation)
110+
let file_size = remote_file.metadata().await.unwrap().len();
111+
112+
// tell the reader to read the page index
113+
ParquetMetaDataReader::new()
114+
.with_page_indexes(true)
115+
.load_and_finish(remote_file, file_size as usize)
116+
.await
117+
.unwrap()
118+
}
119+
120+
/// modifies the metadata to reduce its size
121+
fn prepare_metadata(metadata: ParquetMetaData) -> ParquetMetaData {
122+
let orig_size = metadata.memory_size();
123+
124+
let mut builder = metadata.into_builder();
125+
126+
// remove column statistics to reduce the size of the metadata by converting
127+
// the various structures into their respective builders and modifying them
128+
// as needed.
129+
for row_group in builder.take_row_groups() {
130+
let mut row_group_builder = row_group.into_builder();
131+
for column in row_group_builder.take_columns() {
132+
let column = column.into_builder().clear_statistics().build().unwrap();
133+
row_group_builder = row_group_builder.add_column_metadata(column);
134+
}
135+
let row_group = row_group_builder.build().unwrap();
136+
builder = builder.add_row_group(row_group);
137+
}
138+
let metadata = builder.build();
139+
140+
// verifiy that the size has indeed been reduced
141+
let new_size = metadata.memory_size();
142+
assert!(new_size < orig_size, "metadata size did not decrease");
143+
println!("Reduced metadata size from {} to {}", orig_size, new_size);
144+
metadata
145+
}
146+
147+
/// writes the metadata to a file
148+
///
149+
/// The data is stored using the same thrift format as the Parquet file metadata
150+
fn write_metadata_to_local_file(metadata: ParquetMetaData, file: impl AsRef<Path>) {
151+
let file = File::create(file).unwrap();
152+
ParquetMetaDataWriter::new(file, &metadata)
153+
.finish()
154+
.unwrap()
155+
}
156+
157+
/// Reads the metadata from a file
158+
///
159+
/// This function reads the format written by `write_metadata_to_file`
160+
fn read_metadata_from_local_file(file: impl AsRef<Path>) -> ParquetMetaData {
161+
let file = File::open(file).unwrap();
162+
ParquetMetaDataReader::new()
163+
.with_column_indexes(true)
164+
.with_offset_indexes(true)
165+
.parse_and_finish(&file)
166+
.unwrap()
167+
}
168+
169+
/// Reads the "remote" Parquet file using the metadata
170+
///
171+
/// This shows how to read the Parquet file using previously read metadata
172+
/// instead of the metadata in the Parquet file itself. This avoids an IO /
173+
/// having to fetch and decode the metadata from the Parquet file before
174+
/// beginning to read it.
175+
///
176+
/// Note that this function models reading from a remote file source using a
177+
/// tokio file. In a real application, you would implement [`AsyncFileReader`]
178+
/// for your own remote source.
179+
///
180+
/// In this example, we simply buffer the results in memory as Arrow record
181+
/// batches but a real application would likely process the batches as they are
182+
/// read.
183+
///
184+
/// [`AsyncFileReader`]: parquet::arrow::async_reader::AsyncFileReader
185+
async fn read_remote_parquet_file_with_metadata(
186+
remote_file: tokio::fs::File,
187+
metadata: ParquetMetaData,
188+
) -> Vec<RecordBatch> {
189+
let options = ArrowReaderOptions::new()
190+
// tell the reader to read the page index
191+
.with_page_index(true);
192+
// create a reader with pre-existing metadata
193+
let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
194+
let reader =
195+
ParquetRecordBatchStreamBuilder::new_with_metadata(remote_file, arrow_reader_metadata)
196+
.build()
197+
.unwrap();
198+
199+
reader.try_collect::<Vec<_>>().await.unwrap()
200+
}
201+
202+
/// Make a new parquet file in the temporary directory, and returns the path
203+
fn create_parquet_file(tmpdir: &TempDir) -> PathBuf {
204+
let path = tmpdir.path().join("example.parquet");
205+
let new_file = File::create(&path).unwrap();
206+
207+
let batch = RecordBatch::try_from_iter(vec![
208+
(
209+
"id",
210+
Arc::new(Int32Array::from(vec![100, 200, 201, 300, 102, 33])) as ArrayRef,
211+
),
212+
(
213+
"description",
214+
Arc::new(StringArray::from(vec![
215+
"oranges",
216+
"apples",
217+
"grapefruit",
218+
"bannanas",
219+
"grapes",
220+
"pears",
221+
])),
222+
),
223+
])
224+
.unwrap();
225+
226+
let props = WriterProperties::builder()
227+
// ensure we write the page index level statistics
228+
.set_statistics_enabled(EnabledStatistics::Page)
229+
.build();
230+
231+
let mut writer = ArrowWriter::try_new(new_file, batch.schema(), Some(props)).unwrap();
232+
233+
writer.write(&batch).unwrap();
234+
writer.finish().unwrap();
235+
236+
path
237+
}

0 commit comments

Comments
 (0)