|
| 1 | +// Licensed to the Apache Software Foundation (ASF) under one |
| 2 | +// or more contributor license agreements. See the NOTICE file |
| 3 | +// distributed with this work for additional information |
| 4 | +// regarding copyright ownership. The ASF licenses this file |
| 5 | +// to you under the Apache License, Version 2.0 (the |
| 6 | +// "License"); you may not use this file except in compliance |
| 7 | +// with the License. You may obtain a copy of the License at |
| 8 | +// |
| 9 | +// http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | +// |
| 11 | +// Unless required by applicable law or agreed to in writing, |
| 12 | +// software distributed under the License is distributed on an |
| 13 | +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 14 | +// KIND, either express or implied. See the License for the |
| 15 | +// specific language governing permissions and limitations |
| 16 | +// under the License. |
| 17 | + |
| 18 | +use arrow_array::RecordBatch; |
| 19 | +use arrow_cast::pretty::pretty_format_batches; |
| 20 | +use parquet::arrow::arrow_reader::{ |
| 21 | + ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, |
| 22 | +}; |
| 23 | +use parquet::arrow::ParquetRecordBatchStreamBuilder; |
| 24 | +use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter}; |
| 25 | +use std::path::Path; |
| 26 | + |
| 27 | +/// This example demonstrates advanced usage of the Parquet metadata APIs. |
| 28 | +/// |
| 29 | +/// It shows how you can store Parquet metadata somewhere other than the Parquet |
| 30 | +/// file itself, and then use that metadata to read a file. This can be used, |
| 31 | +/// for example, to store metadata for parquet files on remote object storage |
| 32 | +/// (e.g. S3) in a local file or an in-memory cache, use a query engine like |
| 33 | +/// DataFusion to analyze the metadata to determine which files may contain |
| 34 | +/// relevant data, and then read only those the required files with a single |
| 35 | +/// object store request. |
| 36 | +/// |
| 37 | +/// The example: |
| 38 | +/// 1. Reads the metadata of a Parquet file |
| 39 | +/// 2. Removes some column statistics from the metadata (to make them smaller) |
| 40 | +/// 3. Stores the metadata in a separate file |
| 41 | +/// 4. Reads the metadata from the separate file and uses that to read the Parquet file |
| 42 | +/// |
| 43 | +/// Without this API, to implement the functionality you would need to implement |
| 44 | +/// a conversion of the `ParquetMetaData` and related structures to/from some |
| 45 | +/// other structs that can be serialized/deserialized. |
| 46 | +
|
| 47 | +#[tokio::main(flavor = "current_thread")] |
| 48 | +async fn main() -> parquet::errors::Result<()> { |
| 49 | + let testdata = arrow::util::test_util::parquet_test_data(); |
| 50 | + let parquet_path = format!("{testdata}/alltypes_plain.parquet"); |
| 51 | + let metadata_path = "thrift_metadata.dat"; // todo tempdir for now use local file to inspect it |
| 52 | + |
| 53 | + let metadata = get_metadata_from_parquet_file(&parquet_path).await; |
| 54 | + println!( |
| 55 | + "Read metadata from Parquet file into memory: {} bytes", |
| 56 | + metadata.memory_size() |
| 57 | + ); |
| 58 | + let metadata = prepare_metadata(metadata); |
| 59 | + write_metadata_to_file(metadata, &metadata_path); |
| 60 | + |
| 61 | + // now read the metadata from the file and use it to read the Parquet file |
| 62 | + let metadata = read_metadata_from_file(&metadata_path); |
| 63 | + println!("Read metadata from file: {metadata:#?}"); |
| 64 | + |
| 65 | + let batches = read_parquet_file_with_metadata(&parquet_path, metadata); |
| 66 | + |
| 67 | + // display the results |
| 68 | + let batches_string = pretty_format_batches(&batches).unwrap().to_string(); |
| 69 | + let batches_lines: Vec<_> = batches_string.split('\n').collect(); |
| 70 | + |
| 71 | + assert_eq!(batches_lines, |
| 72 | + [ |
| 73 | + "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+", |
| 74 | + "| id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col | string_col | timestamp_col |", |
| 75 | + "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+", |
| 76 | + "| 4 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30332f30312f3039 | 30 | 2009-03-01T00:00:00 |", |
| 77 | + "| 5 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30332f30312f3039 | 31 | 2009-03-01T00:01:00 |", |
| 78 | + "| 6 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30342f30312f3039 | 30 | 2009-04-01T00:00:00 |", |
| 79 | + "| 7 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30342f30312f3039 | 31 | 2009-04-01T00:01:00 |", |
| 80 | + "| 2 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30322f30312f3039 | 30 | 2009-02-01T00:00:00 |", |
| 81 | + "| 3 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30322f30312f3039 | 31 | 2009-02-01T00:01:00 |", |
| 82 | + "| 0 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30312f30312f3039 | 30 | 2009-01-01T00:00:00 |", |
| 83 | + "| 1 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30312f30312f3039 | 31 | 2009-01-01T00:01:00 |", |
| 84 | + "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+", |
| 85 | + ] |
| 86 | + |
| 87 | + , "actual output:\n\n{batches_lines:#?}"); |
| 88 | + |
| 89 | + Ok(()) |
| 90 | +} |
| 91 | + |
| 92 | +/// Reads the metadata from a parquet file |
| 93 | +async fn get_metadata_from_parquet_file(file: impl AsRef<Path>) -> ParquetMetaData { |
| 94 | + // pretend we are reading the metadata from a remote object store |
| 95 | + let file = std::fs::File::open(file).unwrap(); |
| 96 | + let file = tokio::fs::File::from_std(file); |
| 97 | + |
| 98 | + let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap(); |
| 99 | + |
| 100 | + // The metadata is Arc'd -- since we are going to modify it we |
| 101 | + // need to clone it |
| 102 | + builder.metadata().as_ref().clone() |
| 103 | +} |
| 104 | + |
| 105 | +/// modifies the metadata to reduce its size |
| 106 | +fn prepare_metadata(metadata: ParquetMetaData) -> ParquetMetaData { |
| 107 | + // maybe we will do this |
| 108 | + metadata |
| 109 | +} |
| 110 | + |
| 111 | +/// writes the metadata to a file |
| 112 | +/// |
| 113 | +/// The data is stored using the same thrift format as the Parquet file metadata |
| 114 | +fn write_metadata_to_file(metadata: ParquetMetaData, file: impl AsRef<Path>) { |
| 115 | + let file = std::fs::File::create(file).unwrap(); |
| 116 | + let writer = ParquetMetaDataWriter::new(file, &metadata); |
| 117 | + writer.finish().unwrap() |
| 118 | +} |
| 119 | + |
| 120 | +/// Reads the metadata from a file |
| 121 | +/// |
| 122 | +/// This function reads the format written by `write_metadata_to_file` |
| 123 | +fn read_metadata_from_file(file: impl AsRef<Path>) -> ParquetMetaData { |
| 124 | + let mut file = std::fs::File::open(file).unwrap(); |
| 125 | + ParquetMetaDataReader::new() |
| 126 | + .with_column_indexes(true) |
| 127 | + .with_offset_indexes(true) |
| 128 | + .parse_and_finish(&mut file) |
| 129 | + .unwrap() |
| 130 | +} |
| 131 | + |
| 132 | +/// Reads the Parquet file using the metadata |
| 133 | +/// |
| 134 | +/// This shows how to read the Parquet file using previously read metadata |
| 135 | +/// instead of the metadata in the Parquet file itself. This avoids an IO / |
| 136 | +/// having to fetch and decode the metadata from the Parquet file before |
| 137 | +/// beginning to read it. |
| 138 | +/// |
| 139 | +/// In this example, we read the results as Arrow record batches |
| 140 | +fn read_parquet_file_with_metadata( |
| 141 | + file: impl AsRef<Path>, |
| 142 | + metadata: ParquetMetaData, |
| 143 | +) -> Vec<RecordBatch> { |
| 144 | + let file = std::fs::File::open(file).unwrap(); |
| 145 | + let options = ArrowReaderOptions::new() |
| 146 | + // tell the reader to read the page index |
| 147 | + .with_page_index(true); |
| 148 | + // create a reader with pre-existing metadata |
| 149 | + let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap(); |
| 150 | + let reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, arrow_reader_metadata) |
| 151 | + .build() |
| 152 | + .unwrap(); |
| 153 | + |
| 154 | + reader.collect::<arrow::error::Result<Vec<_>>>().unwrap() |
| 155 | +} |
0 commit comments