Skip to content

Commit

Permalink
feat(source): column pruning for parquet file source (#18967)
Browse files Browse the repository at this point in the history
  • Loading branch information
wcy-fdu authored Oct 25, 2024
1 parent ca75c21 commit 083d66d
Showing 1 changed file with 73 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@ use async_compression::tokio::bufread::GzipDecoder;
use async_trait::async_trait;
use futures::TryStreamExt;
use futures_async_stream::try_stream;
use itertools::Itertools;
use opendal::Operator;
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{parquet_to_arrow_schema, ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::file::metadata::FileMetaData;
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::array::StreamChunk;
use risingwave_common::util::tokio_util::compat::FuturesAsyncReadCompatExt;
use tokio::io::{AsyncRead, BufReader};
Expand All @@ -46,6 +50,7 @@ pub struct OpendalReader<Src: OpendalSource> {
splits: Vec<OpendalFsSplit<Src>>,
parser_config: ParserConfig,
source_ctx: SourceContextRef,
columns: Option<Vec<Column>>,
}
#[async_trait]
impl<Src: OpendalSource> SplitReader for OpendalReader<Src> {
Expand All @@ -57,14 +62,15 @@ impl<Src: OpendalSource> SplitReader for OpendalReader<Src> {
splits: Vec<OpendalFsSplit<Src>>,
parser_config: ParserConfig,
source_ctx: SourceContextRef,
_columns: Option<Vec<Column>>,
columns: Option<Vec<Column>>,
) -> ConnectorResult<Self> {
let connector = Src::new_enumerator(properties)?;
let opendal_reader = OpendalReader {
connector,
splits,
parser_config,
source_ctx,
columns,
};
Ok(opendal_reader)
}
Expand All @@ -86,7 +92,7 @@ impl<Src: OpendalSource> OpendalReader<Src> {

if let EncodingProperties::Parquet = &self.parser_config.specific.encoding_config {
// // If the format is "parquet", use `ParquetParser` to convert `record_batch` into stream chunk.
let reader: tokio_util::compat::Compat<opendal::FuturesAsyncReader> = self
let mut reader: tokio_util::compat::Compat<opendal::FuturesAsyncReader> = self
.connector
.op
.reader_with(&object_name)
Expand All @@ -95,11 +101,19 @@ impl<Src: OpendalSource> OpendalReader<Src> {
.into_futures_async_read(..)
.await?
.compat();
let parquet_metadata = reader.get_metadata().await.map_err(anyhow::Error::from)?;

let file_metadata = parquet_metadata.file_metadata();
let column_indices =
extract_valid_column_indices(self.columns.clone(), file_metadata)?;
let projection_mask =
ProjectionMask::leaves(file_metadata.schema_descr(), column_indices);
// For the Parquet format, we directly convert from a record batch to a stream chunk.
// Therefore, the offset of the Parquet file represents the current position in terms of the number of rows read from the file.
let record_batch_stream = ParquetRecordBatchStreamBuilder::new(reader)
.await?
.with_batch_size(self.source_ctx.source_ctrl_opts.chunk_size)
.with_projection(projection_mask)
.with_offset(split.offset)
.build()?;

Expand Down Expand Up @@ -215,3 +229,59 @@ impl<Src: OpendalSource> OpendalReader<Src> {
}
}
}

/// Extracts valid column indices from a Parquet file schema based on the user's requested schema.
///
/// This function is used for column pruning of Parquet files. It calculates the intersection
/// between the columns in the currently read Parquet file and the schema provided by the user.
/// This is useful for reading a `RecordBatch` with the appropriate `ProjectionMask`, ensuring that
/// only the necessary columns are read.
///
/// # Parameters
/// - `columns`: A vector of `Column` representing the user's requested schema.
/// - `metadata`: A reference to `FileMetaData` containing the schema and metadata of the Parquet file.
///
/// # Returns
/// - A `ConnectorResult<Vec<usize>>`, which contains the indices of the valid columns in the
/// Parquet file schema that match the requested schema. If an error occurs during processing,
/// it returns an appropriate error.
pub fn extract_valid_column_indices(
columns: Option<Vec<Column>>,
metadata: &FileMetaData,
) -> ConnectorResult<Vec<usize>> {
match columns {
Some(rw_columns) => {
let parquet_column_names = metadata
.schema_descr()
.columns()
.iter()
.map(|c| c.name())
.collect_vec();

let converted_arrow_schema =
parquet_to_arrow_schema(metadata.schema_descr(), metadata.key_value_metadata())
.map_err(anyhow::Error::from)?;

let valid_column_indices: Vec<usize> = rw_columns
.iter()
.filter_map(|column| {
parquet_column_names
.iter()
.position(|&name| name == column.name)
.and_then(|pos| {
let arrow_field = IcebergArrowConvert
.to_arrow_field(&column.name, &column.data_type)
.ok()?;
if &arrow_field == converted_arrow_schema.field(pos) {
Some(pos)
} else {
None
}
})
})
.collect();
Ok(valid_column_indices)
}
None => Ok(vec![]),
}
}

0 comments on commit 083d66d

Please sign in to comment.