diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs index 1cfc9c1355167..69308a092e2dd 100644 --- a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs +++ b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs @@ -19,8 +19,12 @@ use async_compression::tokio::bufread::GzipDecoder; use async_trait::async_trait; use futures::TryStreamExt; use futures_async_stream::try_stream; +use itertools::Itertools; use opendal::Operator; -use parquet::arrow::ParquetRecordBatchStreamBuilder; +use parquet::arrow::async_reader::AsyncFileReader; +use parquet::arrow::{parquet_to_arrow_schema, ParquetRecordBatchStreamBuilder, ProjectionMask}; +use parquet::file::metadata::FileMetaData; +use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::array::StreamChunk; use risingwave_common::util::tokio_util::compat::FuturesAsyncReadCompatExt; use tokio::io::{AsyncRead, BufReader}; @@ -46,6 +50,7 @@ pub struct OpendalReader { splits: Vec>, parser_config: ParserConfig, source_ctx: SourceContextRef, + columns: Option>, } #[async_trait] impl SplitReader for OpendalReader { @@ -57,7 +62,7 @@ impl SplitReader for OpendalReader { splits: Vec>, parser_config: ParserConfig, source_ctx: SourceContextRef, - _columns: Option>, + columns: Option>, ) -> ConnectorResult { let connector = Src::new_enumerator(properties)?; let opendal_reader = OpendalReader { @@ -65,6 +70,7 @@ impl SplitReader for OpendalReader { splits, parser_config, source_ctx, + columns, }; Ok(opendal_reader) } @@ -86,7 +92,7 @@ impl OpendalReader { if let EncodingProperties::Parquet = &self.parser_config.specific.encoding_config { // // If the format is "parquet", use `ParquetParser` to convert `record_batch` into stream chunk. - let reader: tokio_util::compat::Compat = self + let mut reader: tokio_util::compat::Compat = self .connector .op .reader_with(&object_name) @@ -95,11 +101,19 @@ impl OpendalReader { .into_futures_async_read(..) .await? .compat(); + let parquet_metadata = reader.get_metadata().await.map_err(anyhow::Error::from)?; + + let file_metadata = parquet_metadata.file_metadata(); + let column_indices = + extract_valid_column_indices(self.columns.clone(), file_metadata)?; + let projection_mask = + ProjectionMask::leaves(file_metadata.schema_descr(), column_indices); // For the Parquet format, we directly convert from a record batch to a stream chunk. // Therefore, the offset of the Parquet file represents the current position in terms of the number of rows read from the file. let record_batch_stream = ParquetRecordBatchStreamBuilder::new(reader) .await? .with_batch_size(self.source_ctx.source_ctrl_opts.chunk_size) + .with_projection(projection_mask) .with_offset(split.offset) .build()?; @@ -215,3 +229,59 @@ impl OpendalReader { } } } + +/// Extracts valid column indices from a Parquet file schema based on the user's requested schema. +/// +/// This function is used for column pruning of Parquet files. It calculates the intersection +/// between the columns in the currently read Parquet file and the schema provided by the user. +/// This is useful for reading a `RecordBatch` with the appropriate `ProjectionMask`, ensuring that +/// only the necessary columns are read. +/// +/// # Parameters +/// - `columns`: A vector of `Column` representing the user's requested schema. +/// - `metadata`: A reference to `FileMetaData` containing the schema and metadata of the Parquet file. +/// +/// # Returns +/// - A `ConnectorResult>`, which contains the indices of the valid columns in the +/// Parquet file schema that match the requested schema. If an error occurs during processing, +/// it returns an appropriate error. +pub fn extract_valid_column_indices( + columns: Option>, + metadata: &FileMetaData, +) -> ConnectorResult> { + match columns { + Some(rw_columns) => { + let parquet_column_names = metadata + .schema_descr() + .columns() + .iter() + .map(|c| c.name()) + .collect_vec(); + + let converted_arrow_schema = + parquet_to_arrow_schema(metadata.schema_descr(), metadata.key_value_metadata()) + .map_err(anyhow::Error::from)?; + + let valid_column_indices: Vec = rw_columns + .iter() + .filter_map(|column| { + parquet_column_names + .iter() + .position(|&name| name == column.name) + .and_then(|pos| { + let arrow_field = IcebergArrowConvert + .to_arrow_field(&column.name, &column.data_type) + .ok()?; + if &arrow_field == converted_arrow_schema.field(pos) { + Some(pos) + } else { + None + } + }) + }) + .collect(); + Ok(valid_column_indices) + } + None => Ok(vec![]), + } +}