|
| 1 | +// Licensed to the Apache Software Foundation (ASF) under one |
| 2 | +// or more contributor license agreements. See the NOTICE file |
| 3 | +// distributed with this work for additional information |
| 4 | +// regarding copyright ownership. The ASF licenses this file |
| 5 | +// to you under the Apache License, Version 2.0 (the |
| 6 | +// "License"); you may not use this file except in compliance |
| 7 | +// with the License. You may obtain a copy of the License at |
| 8 | +// |
| 9 | +// http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | +// |
| 11 | +// Unless required by applicable law or agreed to in writing, |
| 12 | +// software distributed under the License is distributed on an |
| 13 | +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 14 | +// KIND, either express or implied. See the License for the |
| 15 | +// specific language governing permissions and limitations |
| 16 | +// under the License. |
| 17 | + |
| 18 | +use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray}; |
| 19 | +use arrow_cast::pretty::pretty_format_batches; |
| 20 | +use parquet::arrow::arrow_reader::{ |
| 21 | + ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, |
| 22 | +}; |
| 23 | +use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder}; |
| 24 | +use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter}; |
| 25 | +use parquet::file::properties::{EnabledStatistics, WriterProperties}; |
| 26 | +use std::fs::File; |
| 27 | +use std::path::{Path, PathBuf}; |
| 28 | +use std::sync::Arc; |
| 29 | +use tempfile::TempDir; |
| 30 | + |
| 31 | +/// This example demonstrates advanced usage of the Parquet metadata APIs. |
| 32 | +/// |
| 33 | +/// # Usecase |
| 34 | +/// |
| 35 | +/// 1. Read Parquet metadata from an existing Parquet file. |
| 36 | +/// |
| 37 | +/// 2. Store that metadata somewhere other than the rest of the parquet data |
| 38 | +/// (e.g. a cache) that is not necessarily in RAM |
| 39 | +/// |
| 40 | +/// 3. Use the metadata to determine of the file should be read, and if so, |
| 41 | +/// read the data stored in that Parquet file, without re-reading / reparsing |
| 42 | +/// the metadata. |
| 43 | +/// |
| 44 | +/// Note that Parquet metadata is not necessarily contiguous in the files: part |
| 45 | +/// is stored in the footer (the last bytes of the file), but other portions |
| 46 | +/// (such as the PageIndex) can be stored elsewhere, |
| 47 | +/// |
| 48 | +/// You can use the these APIs to store a copy of the metadata for parquet files |
| 49 | +/// stored on remote object storage (e.g. S3) in a local file or an in-memory |
| 50 | +/// cache, use a query engine like DataFusion to analyze the metadata to |
| 51 | +/// determine which file to read, and then read those files with a single |
| 52 | +/// object store request. |
| 53 | +/// |
| 54 | +/// # Specifically, this example: |
| 55 | +/// 1. Reads the metadata of a Parquet file |
| 56 | +/// 2. Removes some column statistics from the metadata (to make them smaller) |
| 57 | +/// 3. Stores the metadata in a separate file |
| 58 | +/// 4. Reads the metadata from the separate file and uses that to read the Parquet file |
| 59 | +/// |
| 60 | +/// Without this API, to implement the functionality you would need to implement |
| 61 | +/// a conversion of the `ParquetMetaData` and related structures to/from some |
| 62 | +/// other structs that can be serialized/deserialized. |
| 63 | +
|
| 64 | +#[tokio::main(flavor = "current_thread")] |
| 65 | +async fn main() -> parquet::errors::Result<()> { |
| 66 | + let tempdir = TempDir::new().unwrap(); |
| 67 | + println!("data in {tempdir:?}"); |
| 68 | + let parquet_path = create_parquet_file(&tempdir); |
| 69 | + let metadata_path = tempdir.path().join("thrift_metadata.dat"); |
| 70 | + // temp: don't clean up tempdir |
| 71 | + std::mem::forget(tempdir); |
| 72 | + |
| 73 | + let metadata = get_metadata_from_parquet_file(&parquet_path).await; |
| 74 | + println!( |
| 75 | + "Read metadata from Parquet file into memory: {} bytes", |
| 76 | + metadata.memory_size() |
| 77 | + ); |
| 78 | + let metadata = prepare_metadata(metadata); |
| 79 | + write_metadata_to_file(metadata, &metadata_path); |
| 80 | + |
| 81 | + // now read the metadata from the file and use it to read the Parquet file |
| 82 | + let metadata = read_metadata_from_file(&metadata_path); |
| 83 | + println!("Read metadata from file: {metadata:#?}"); |
| 84 | + |
| 85 | + let batches = read_parquet_file_with_metadata(&parquet_path, metadata); |
| 86 | + |
| 87 | + // display the results |
| 88 | + let batches_string = pretty_format_batches(&batches).unwrap().to_string(); |
| 89 | + let batches_lines: Vec<_> = batches_string.split('\n').collect(); |
| 90 | + |
| 91 | + assert_eq!( |
| 92 | + batches_lines, |
| 93 | + [ |
| 94 | + "+-----+-------------+", |
| 95 | + "| id | description |", |
| 96 | + "+-----+-------------+", |
| 97 | + "| 100 | oranges |", |
| 98 | + "| 200 | apples |", |
| 99 | + "| 201 | grapefruit |", |
| 100 | + "| 300 | bannanas |", |
| 101 | + "| 102 | grames |", |
| 102 | + "| 33 | pears |", |
| 103 | + "+-----+-------------+", |
| 104 | + ], |
| 105 | + "actual output:\n\n{batches_lines:#?}" |
| 106 | + ); |
| 107 | + |
| 108 | + Ok(()) |
| 109 | +} |
| 110 | + |
| 111 | +/// Reads the metadata from a parquet file |
| 112 | +async fn get_metadata_from_parquet_file(file: impl AsRef<Path>) -> ParquetMetaData { |
| 113 | + // pretend we are reading the metadata from a remote object store |
| 114 | + let file = std::fs::File::open(file).unwrap(); |
| 115 | + let file = tokio::fs::File::from_std(file); |
| 116 | + |
| 117 | + // tell the reader to read the page index |
| 118 | + let reader_options = ArrowReaderOptions::new().with_page_index(true); |
| 119 | + |
| 120 | + let builder = ParquetRecordBatchStreamBuilder::new_with_options(file, reader_options) |
| 121 | + .await |
| 122 | + .unwrap(); |
| 123 | + |
| 124 | + // The metadata is Arc'd -- so unwrap it after dropping the builder |
| 125 | + let metadata = Arc::clone(builder.metadata()); |
| 126 | + drop(builder); |
| 127 | + Arc::try_unwrap(metadata).unwrap() |
| 128 | +} |
| 129 | + |
| 130 | +/// modifies the metadata to reduce its size |
| 131 | +fn prepare_metadata(metadata: ParquetMetaData) -> ParquetMetaData { |
| 132 | + // maybe we will do this |
| 133 | + metadata |
| 134 | +} |
| 135 | + |
| 136 | +/// writes the metadata to a file |
| 137 | +/// |
| 138 | +/// The data is stored using the same thrift format as the Parquet file metadata |
| 139 | +fn write_metadata_to_file(metadata: ParquetMetaData, file: impl AsRef<Path>) { |
| 140 | + let file = File::create(file).unwrap(); |
| 141 | + let writer = ParquetMetaDataWriter::new(file, &metadata); |
| 142 | + writer.finish().unwrap() |
| 143 | +} |
| 144 | + |
| 145 | +/// Reads the metadata from a file |
| 146 | +/// |
| 147 | +/// This function reads the format written by `write_metadata_to_file` |
| 148 | +fn read_metadata_from_file(file: impl AsRef<Path>) -> ParquetMetaData { |
| 149 | + let file = File::open(file).unwrap(); |
| 150 | + ParquetMetaDataReader::new() |
| 151 | + .with_column_indexes(true) |
| 152 | + .with_offset_indexes(true) |
| 153 | + .parse_and_finish(&file) |
| 154 | + .unwrap() |
| 155 | +} |
| 156 | + |
| 157 | +/// Reads the Parquet file using the metadata |
| 158 | +/// |
| 159 | +/// This shows how to read the Parquet file using previously read metadata |
| 160 | +/// instead of the metadata in the Parquet file itself. This avoids an IO / |
| 161 | +/// having to fetch and decode the metadata from the Parquet file before |
| 162 | +/// beginning to read it. |
| 163 | +/// |
| 164 | +/// In this example, we read the results as Arrow record batches |
| 165 | +fn read_parquet_file_with_metadata( |
| 166 | + file: impl AsRef<Path>, |
| 167 | + metadata: ParquetMetaData, |
| 168 | +) -> Vec<RecordBatch> { |
| 169 | + let file = std::fs::File::open(file).unwrap(); |
| 170 | + let options = ArrowReaderOptions::new() |
| 171 | + // tell the reader to read the page index |
| 172 | + .with_page_index(true); |
| 173 | + // create a reader with pre-existing metadata |
| 174 | + let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap(); |
| 175 | + let reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, arrow_reader_metadata) |
| 176 | + .build() |
| 177 | + .unwrap(); |
| 178 | + |
| 179 | + reader.collect::<arrow::error::Result<Vec<_>>>().unwrap() |
| 180 | +} |
| 181 | + |
| 182 | +/// Make a new parquet file in the temporary directory, and returns the path |
| 183 | +fn create_parquet_file(tmpdir: &TempDir) -> PathBuf { |
| 184 | + let path = tmpdir.path().join("example.parquet"); |
| 185 | + let new_file = File::create(&path).unwrap(); |
| 186 | + |
| 187 | + let batch = RecordBatch::try_from_iter(vec![ |
| 188 | + ( |
| 189 | + "id", |
| 190 | + Arc::new(Int32Array::from(vec![100, 200, 201, 300, 102, 33])) as ArrayRef, |
| 191 | + ), |
| 192 | + ( |
| 193 | + "description", |
| 194 | + Arc::new(StringArray::from(vec![ |
| 195 | + "oranges", |
| 196 | + "apples", |
| 197 | + "grapefruit", |
| 198 | + "bannanas", |
| 199 | + "grames", |
| 200 | + "pears", |
| 201 | + ])), |
| 202 | + ), |
| 203 | + ]) |
| 204 | + .unwrap(); |
| 205 | + |
| 206 | + let props = WriterProperties::builder() |
| 207 | + // ensure we write the page index level statistics |
| 208 | + .set_statistics_enabled(EnabledStatistics::Page) |
| 209 | + .build(); |
| 210 | + |
| 211 | + let mut writer = ArrowWriter::try_new(new_file, batch.schema(), Some(props)).unwrap(); |
| 212 | + |
| 213 | + writer.write(&batch).unwrap(); |
| 214 | + writer.finish().unwrap(); |
| 215 | + |
| 216 | + path |
| 217 | +} |
0 commit comments