From d7e04627282b99a45cb83b02d46219049ac7b380 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 22 May 2024 08:01:00 -0400 Subject: [PATCH 1/8] Add FileScanConfig::new() API, update code to use new API --- datafusion-examples/examples/csv_opener.rs | 16 +- datafusion-examples/examples/json_opener.rs | 16 +- .../core/src/datasource/file_format/mod.rs | 15 +- .../core/src/datasource/listing/table.rs | 17 +-- .../core/src/datasource/physical_plan/avro.rs | 55 +++---- .../core/src/datasource/physical_plan/csv.rs | 12 +- .../physical_plan/file_scan_config.rs | 139 ++++++++++++++++-- .../datasource/physical_plan/file_stream.rs | 16 +- .../core/src/datasource/physical_plan/json.rs | 52 ++----- .../datasource/physical_plan/parquet/mod.rs | 56 ++----- .../combine_partial_final_agg.rs | 17 +-- .../enforce_distribution.rs | 74 +++------- .../physical_optimizer/projection_pushdown.rs | 34 ++--- .../replace_with_order_preserving_variants.rs | 22 +-- .../core/src/physical_optimizer/test_utils.rs | 27 +--- datafusion/core/src/test/mod.rs | 42 ++---- datafusion/core/src/test_util/parquet.rs | 27 ++-- .../core/tests/parquet/custom_reader.rs | 17 +-- datafusion/core/tests/parquet/page_pruning.rs | 14 +- .../core/tests/parquet/schema_adapter.rs | 14 +- .../core/tests/parquet/schema_coercion.rs | 31 +--- datafusion/execution/src/object_store.rs | 9 +- .../substrait/src/physical_plan/consumer.rs | 17 +-- .../tests/cases/roundtrip_physical_plan.rs | 35 ++--- 24 files changed, 331 insertions(+), 443 deletions(-) diff --git a/datafusion-examples/examples/csv_opener.rs b/datafusion-examples/examples/csv_opener.rs index 96753c8c5260..d02aa9b3088c 100644 --- a/datafusion-examples/examples/csv_opener.rs +++ b/datafusion-examples/examples/csv_opener.rs @@ -17,7 +17,6 @@ use std::{sync::Arc, vec}; -use datafusion::common::Statistics; use datafusion::{ assert_batches_eq, datasource::{ @@ -58,16 +57,11 @@ async fn main() -> Result<()> { let path = std::path::Path::new(&path).canonicalize()?; - let scan_config = FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_schema: schema.clone(), - file_groups: vec![vec![PartitionedFile::new(path.display().to_string(), 10)]], - statistics: Statistics::new_unknown(&schema), - projection: Some(vec![12, 0]), - limit: Some(5), - table_partition_cols: vec![], - output_ordering: vec![], - }; + let scan_config = + FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema.clone()) + .with_projection(Some(vec![12, 0])) + .with_limit(Some(5)) + .with_file(PartitionedFile::new(path.display().to_string(), 10)); let result = FileStream::new(&scan_config, 0, opener, &ExecutionPlanMetricsSet::new()) diff --git a/datafusion-examples/examples/json_opener.rs b/datafusion-examples/examples/json_opener.rs index ee33f969caa9..e32fb9b09630 100644 --- a/datafusion-examples/examples/json_opener.rs +++ b/datafusion-examples/examples/json_opener.rs @@ -29,7 +29,6 @@ use datafusion::{ error::Result, physical_plan::metrics::ExecutionPlanMetricsSet, }; -use datafusion_common::Statistics; use futures::StreamExt; use object_store::ObjectStore; @@ -61,16 +60,11 @@ async fn main() -> Result<()> { Arc::new(object_store), ); - let scan_config = FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_schema: schema.clone(), - file_groups: vec![vec![PartitionedFile::new(path.to_string(), 10)]], - statistics: Statistics::new_unknown(&schema), - projection: Some(vec![1, 0]), - limit: Some(5), - table_partition_cols: vec![], - output_ordering: vec![], - }; + let scan_config = + FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema.clone()) + .with_projection(Some(vec![1, 0])) + .with_limit(Some(5)) + .with_file(PartitionedFile::new(path.to_string(), 10)); let result = FileStream::new(&scan_config, 0, opener, &ExecutionPlanMetricsSet::new()) diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 243a91b7437b..7cc3421ebb48 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -154,16 +154,11 @@ pub(crate) mod test_util { let exec = format .create_physical_plan( state, - FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_schema, - file_groups, - statistics, - projection, - limit, - table_partition_cols: vec![], - output_ordering: vec![], - }, + FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema) + .with_file_groups(file_groups) + .with_statistics(statistics) + .with_projection(projection) + .with_limit(limit), None, ) .await?; diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index cf70894806a3..746e4b8e3330 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -805,16 +805,13 @@ impl TableProvider for ListingTable { .format .create_physical_plan( state, - FileScanConfig { - object_store_url, - file_schema: Arc::clone(&self.file_schema), - file_groups: partitioned_file_lists, - statistics, - projection: projection.cloned(), - limit, - output_ordering, - table_partition_cols, - }, + FileScanConfig::new(object_store_url, Arc::clone(&self.file_schema)) + .with_file_groups(partitioned_file_lists) + .with_statistics(statistics) + .with_projection(projection.cloned()) + .with_limit(limit) + .with_output_ordering(output_ordering) + .with_table_partition_cols(table_partition_cols), filters.as_ref(), ) .await diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs b/datafusion/core/src/datasource/physical_plan/avro.rs index 4e5140e82d3f..28649e2b1cac 100644 --- a/datafusion/core/src/datasource/physical_plan/avro.rs +++ b/datafusion/core/src/datasource/physical_plan/avro.rs @@ -273,16 +273,11 @@ mod tests { .infer_schema(&state, &store, &[meta.clone()]) .await?; - let avro_exec = AvroExec::new(FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: vec![vec![meta.into()]], - statistics: Statistics::new_unknown(&file_schema), - file_schema, - projection: Some(vec![0, 1, 2]), - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }); + let avro_exec = AvroExec::new( + FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema) + .with_file(meta.into()) + .with_projection(Some(vec![0, 1, 2])), + ); assert_eq!( avro_exec .properties() @@ -350,16 +345,11 @@ mod tests { // Include the missing column in the projection let projection = Some(vec![0, 1, 2, actual_schema.fields().len()]); - let avro_exec = AvroExec::new(FileScanConfig { - object_store_url, - file_groups: vec![vec![meta.into()]], - statistics: Statistics::new_unknown(&file_schema), - file_schema, - projection, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }); + let avro_exec = AvroExec::new( + FileScanConfig::new(object_store_url, file_schema) + .with_file(meta.into()) + .with_projection(projection), + ); assert_eq!( avro_exec .properties() @@ -424,18 +414,19 @@ mod tests { let mut partitioned_file = PartitionedFile::from(meta); partitioned_file.partition_values = vec![ScalarValue::from("2021-10-26")]; - let avro_exec = AvroExec::new(FileScanConfig { - // select specific columns of the files as well as the partitioning - // column which is supposed to be the last column in the table schema. - projection: Some(vec![0, 1, file_schema.fields().len(), 2]), - object_store_url, - file_groups: vec![vec![partitioned_file]], - statistics: Statistics::new_unknown(&file_schema), - file_schema, - limit: None, - table_partition_cols: vec![Field::new("date", DataType::Utf8, false)], - output_ordering: vec![], - }); + let projection = Some(vec![0, 1, file_schema.fields().len(), 2]); + let avro_exec = AvroExec::new( + FileScanConfig::new(object_store_url, file_schema) + // select specific columns of the files as well as the partitioning + // column which is supposed to be the last column in the table schema. + .with_projection(projection) + .with_file(partitioned_file) + .with_table_partition_cols(vec![Field::new( + "date", + DataType::Utf8, + false, + )]), + ); assert_eq!( avro_exec .properties() diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index cc7c837e471e..7989ce64b236 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -561,7 +561,7 @@ mod tests { tmp_dir.path(), )?; - let mut config = partitioned_csv_config(file_schema, file_groups)?; + let mut config = partitioned_csv_config(file_schema, file_groups); config.projection = Some(vec![0, 2, 4]); let csv = CsvExec::new( @@ -627,7 +627,7 @@ mod tests { tmp_dir.path(), )?; - let mut config = partitioned_csv_config(file_schema, file_groups)?; + let mut config = partitioned_csv_config(file_schema, file_groups); config.projection = Some(vec![4, 0, 2]); let csv = CsvExec::new( @@ -693,7 +693,7 @@ mod tests { tmp_dir.path(), )?; - let mut config = partitioned_csv_config(file_schema, file_groups)?; + let mut config = partitioned_csv_config(file_schema, file_groups); config.limit = Some(5); let csv = CsvExec::new( @@ -756,7 +756,7 @@ mod tests { tmp_dir.path(), )?; - let mut config = partitioned_csv_config(file_schema, file_groups)?; + let mut config = partitioned_csv_config(file_schema, file_groups); config.limit = Some(5); let csv = CsvExec::new( @@ -809,7 +809,7 @@ mod tests { tmp_dir.path(), )?; - let mut config = partitioned_csv_config(file_schema, file_groups)?; + let mut config = partitioned_csv_config(file_schema, file_groups); // Add partition columns config.table_partition_cols = vec![Field::new("date", DataType::Utf8, false)]; @@ -914,7 +914,7 @@ mod tests { ) .unwrap(); - let config = partitioned_csv_config(file_schema, file_groups).unwrap(); + let config = partitioned_csv_config(file_schema, file_groups); let csv = CsvExec::new( config, true, diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index 4de7eb136f22..16c5ce3a9ba6 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -64,12 +64,43 @@ pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue { /// The base configurations to provide when creating a physical plan for /// any given file format. +/// +/// # Example +/// ``` +/// # use std::sync::Arc; +/// # use arrow_schema::Schema; +/// use datafusion::datasource::listing::PartitionedFile; +/// # use datafusion::datasource::physical_plan::FileScanConfig; +/// # use datafusion_execution::object_store::ObjectStoreUrl; +/// # let file_schema = Arc::new(Schema::empty()); +/// // create FileScan config for reading data from file:// +/// let object_store_url = ObjectStoreUrl::local_filesystem(); +/// let mut config = FileScanConfig::new(object_store_url, file_schema) +/// .with_limit(Some(1000)) // read only the first 1000 records +/// .with_projection(Some(vec![2, 3])); // project columns 2 and 3 +/// +/// // Read /tmp/file1.parquet with known size of 1234 bytes in a single group +/// config.add_file(PartitionedFile::new("file1.parquet", 1234)); +/// +/// // Read /tmp/file2.parquet 56 bytes and /tmp/file3.parquet 78 bytes +/// // in a single row group +/// config.add_file_group(vec![ +/// PartitionedFile::new("file2.parquet", 56), +/// PartitionedFile::new("file3.parquet", 78), +/// ]); +/// ``` #[derive(Clone)] pub struct FileScanConfig { /// Object store URL, used to get an [`ObjectStore`] instance from /// [`RuntimeEnv::object_store`] /// + /// This `ObjectStoreUrl` should be the prefix of the absolute url for files + /// as `file://` or `s3://my_bucket`. It should not include the path to the + /// file itself. The relevant URL prefix must be registered via + /// [`RuntimeEnv::register_object_store`] + /// /// [`ObjectStore`]: object_store::ObjectStore + /// [`RuntimeEnv::register_object_store`]: datafusion_execution::runtime_env::RuntimeEnv::register_object_store /// [`RuntimeEnv::object_store`]: datafusion_execution::runtime_env::RuntimeEnv::object_store pub object_store_url: ObjectStoreUrl, /// Schema before `projection` is applied. It contains the all columns that may @@ -87,6 +118,7 @@ pub struct FileScanConfig { /// sequentially, one after the next. pub file_groups: Vec>, /// Estimated overall statistics of the files, taking `filters` into account. + /// Defaults to [`Statistics::new_unknown`]. pub statistics: Statistics, /// Columns on which to project the data. Indexes that are higher than the /// number of columns of `file_schema` refer to `table_partition_cols`. @@ -101,6 +133,99 @@ pub struct FileScanConfig { } impl FileScanConfig { + /// Create a new `FileScanConfig` with default settings for scanning files. + /// + /// No file groups are added by default. See [`Self::add_file`] and + /// [`Self::add_file_group`] + /// + /// # Parameters: + /// * `object_store_url`: See [`Self::object_store_url`] + /// * `file_schema`: See [`Self::file_schema`] + pub fn new(object_store_url: ObjectStoreUrl, file_schema: SchemaRef) -> Self { + let statistics = Statistics::new_unknown(&file_schema); + Self { + object_store_url, + file_schema, + file_groups: vec![], + statistics, + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering: vec![], + } + } + + /// Add a new file as a single file group + /// + /// See [Self::file_groups] for more information + pub fn add_file(&mut self, file: PartitionedFile) { + self.add_file_group(vec![file]) + } + + /// Add a new file group + /// + /// See [Self::file_groups] for more information + pub fn add_file_group(&mut self, file_group: Vec) { + self.file_groups.push(file_group); + } + + /// Set the statistics of the files + pub fn with_statistics(mut self, statistics: Statistics) -> Self { + self.statistics = statistics; + self + } + + /// Set the projection of the files + pub fn with_projection(mut self, projection: Option>) -> Self { + self.projection = projection; + self + } + + /// Set the limit of the files + pub fn with_limit(mut self, limit: Option) -> Self { + self.limit = limit; + self + } + + /// Add a file as a single group + /// + /// See [Self::file_groups] for more information. + pub fn with_file(mut self, file: PartitionedFile) -> Self { + self.add_file(file); + self + } + + /// Add the file groups + /// + /// See [Self::file_groups] for more information. + pub fn with_file_groups( + mut self, + mut file_groups: Vec>, + ) -> Self { + self.file_groups.append(&mut file_groups); + self + } + + /// Add a new file group + /// + /// See [Self::file_groups] for more information + pub fn with_file_group(mut self, file_group: Vec) -> Self { + self.add_file_group(file_group); + self + } + + /// Set the partitioning columns of the files + pub fn with_table_partition_cols(mut self, table_partition_cols: Vec) -> Self { + self.table_partition_cols = table_partition_cols; + self + } + + /// Set the output ordering of the files + pub fn with_output_ordering(mut self, output_ordering: Vec) -> Self { + self.output_ordering = output_ordering; + self + } + /// Project the schema and the statistics on the given column indices pub fn project(&self) -> (SchemaRef, Statistics, Vec) { if self.projection.is_none() && self.table_partition_cols.is_empty() { @@ -1117,16 +1242,10 @@ mod tests { statistics: Statistics, table_partition_cols: Vec, ) -> FileScanConfig { - FileScanConfig { - file_schema, - file_groups: vec![vec![]], - limit: None, - object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), - projection, - statistics, - table_partition_cols, - output_ordering: vec![], - } + FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), file_schema) + .with_projection(projection) + .with_statistics(statistics) + .with_table_partition_cols(table_partition_cols) } /// Convert partition columns from Vec to Vec diff --git a/datafusion/core/src/datasource/physical_plan/file_stream.rs b/datafusion/core/src/datasource/physical_plan/file_stream.rs index 9732d08c7a1d..6f354b31ae87 100644 --- a/datafusion/core/src/datasource/physical_plan/file_stream.rs +++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs @@ -524,7 +524,7 @@ mod tests { use crate::test::{make_partition, object_store::register_test_store}; use arrow_schema::Schema; - use datafusion_common::{internal_err, Statistics}; + use datafusion_common::internal_err; /// Test `FileOpener` which will simulate errors during file opening or scanning #[derive(Default)] @@ -643,16 +643,12 @@ mod tests { let on_error = self.on_error; - let config = FileScanConfig { - object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), - statistics: Statistics::new_unknown(&file_schema), + let config = FileScanConfig::new( + ObjectStoreUrl::parse("test:///").unwrap(), file_schema, - file_groups: vec![file_group], - projection: None, - limit: self.limit, - table_partition_cols: vec![], - output_ordering: vec![], - }; + ) + .with_file_group(file_group) + .with_limit(self.limit); let metrics_set = ExecutionPlanMetricsSet::new(); let file_stream = FileStream::new(&config, 0, self.opener, &metrics_set) .unwrap() diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index 0180caa85011..66843885ce45 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -521,16 +521,9 @@ mod tests { prepare_store(&state, file_compression_type.to_owned(), tmp_dir.path()).await; let exec = NdJsonExec::new( - FileScanConfig { - object_store_url, - file_groups, - statistics: Statistics::new_unknown(&file_schema), - file_schema, - projection: None, - limit: Some(3), - table_partition_cols: vec![], - output_ordering: vec![], - }, + FileScanConfig::new(object_store_url, file_schema) + .with_file_groups(file_groups) + .with_limit(Some(3)), file_compression_type.to_owned(), ); @@ -599,16 +592,9 @@ mod tests { let missing_field_idx = file_schema.fields.len() - 1; let exec = NdJsonExec::new( - FileScanConfig { - object_store_url, - file_groups, - statistics: Statistics::new_unknown(&file_schema), - file_schema, - projection: None, - limit: Some(3), - table_partition_cols: vec![], - output_ordering: vec![], - }, + FileScanConfig::new(object_store_url, file_schema) + .with_file_groups(file_groups) + .with_limit(Some(3)), file_compression_type.to_owned(), ); @@ -646,16 +632,9 @@ mod tests { prepare_store(&state, file_compression_type.to_owned(), tmp_dir.path()).await; let exec = NdJsonExec::new( - FileScanConfig { - object_store_url, - file_groups, - statistics: Statistics::new_unknown(&file_schema), - file_schema, - projection: Some(vec![0, 2]), - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }, + FileScanConfig::new(object_store_url, file_schema) + .with_file_groups(file_groups) + .with_projection(Some(vec![0, 2])), file_compression_type.to_owned(), ); let inferred_schema = exec.schema(); @@ -698,16 +677,9 @@ mod tests { prepare_store(&state, file_compression_type.to_owned(), tmp_dir.path()).await; let exec = NdJsonExec::new( - FileScanConfig { - object_store_url, - file_groups, - statistics: Statistics::new_unknown(&file_schema), - file_schema, - projection: Some(vec![3, 0, 2]), - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }, + FileScanConfig::new(object_store_url, file_schema) + .with_file_groups(file_groups) + .with_projection(Some(vec![3, 0, 2])), file_compression_type.to_owned(), ); let inferred_schema = exec.schema(); diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index dd953878df49..1abd0bf2b83d 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -920,23 +920,16 @@ mod tests { // files with multiple pages let multi_page = page_index_predicate; let (meta, _files) = store_parquet(batches, multi_page).await.unwrap(); - let file_groups = meta.into_iter().map(Into::into).collect(); + let file_group = meta.into_iter().map(Into::into).collect(); // set up predicate (this is normally done by a layer higher up) let predicate = predicate.map(|p| logical2physical(&p, &file_schema)); // prepare the scan let mut parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: vec![file_groups], - statistics: Statistics::new_unknown(&file_schema), - file_schema, - projection, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }, + FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema) + .with_file_group(file_group) + .with_projection(projection), predicate, None, Default::default(), @@ -1585,16 +1578,8 @@ mod tests { file_schema: SchemaRef, ) -> Result<()> { let parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups, - statistics: Statistics::new_unknown(&file_schema), - file_schema, - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }, + FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema) + .with_file_groups(file_groups), None, None, Default::default(), @@ -1695,15 +1680,11 @@ mod tests { ]); let parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url, - file_groups: vec![vec![partitioned_file]], - file_schema: schema.clone(), - statistics: Statistics::new_unknown(&schema), + FileScanConfig::new(object_store_url, schema.clone()) + .with_file(partitioned_file) // file has 10 cols so index 12 should be month and 13 should be day - projection: Some(vec![0, 1, 2, 12, 13]), - limit: None, - table_partition_cols: vec![ + .with_projection(Some(vec![0, 1, 2, 12, 13])) + .with_table_partition_cols(vec![ Field::new("year", DataType::Utf8, false), Field::new("month", DataType::UInt8, false), Field::new( @@ -1714,9 +1695,7 @@ mod tests { ), false, ), - ], - output_ordering: vec![], - }, + ]), None, None, Default::default(), @@ -1774,17 +1753,10 @@ mod tests { extensions: None, }; + let file_schema = Arc::new(Schema::empty()); let parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: vec![vec![partitioned_file]], - file_schema: Arc::new(Schema::empty()), - statistics: Statistics::new_unknown(&Schema::empty()), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }, + FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema) + .with_file(partitioned_file), None, None, Default::default(), diff --git a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs index e41e4dd31647..b93f4012b093 100644 --- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs +++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs @@ -203,7 +203,7 @@ mod tests { use crate::datasource::physical_plan::{FileScanConfig, ParquetExec}; use crate::physical_plan::expressions::lit; use crate::physical_plan::repartition::RepartitionExec; - use crate::physical_plan::{displayable, Partitioning, Statistics}; + use crate::physical_plan::{displayable, Partitioning}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_physical_expr::expressions::{col, Count, Sum}; @@ -246,16 +246,11 @@ mod tests { fn parquet_exec(schema: &SchemaRef) -> Arc { Arc::new(ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), - file_schema: schema.clone(), - file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], - statistics: Statistics::new_unknown(schema), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }, + FileScanConfig::new( + ObjectStoreUrl::parse("test:///").unwrap(), + schema.clone(), + ) + .with_file(PartitionedFile::new("x".to_string(), 100)), None, None, Default::default(), diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index cd84e911d381..033cec53019d 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1432,16 +1432,9 @@ pub(crate) mod tests { output_ordering: Vec>, ) -> Arc { Arc::new(ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), - file_schema: schema(), - file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], - statistics: Statistics::new_unknown(&schema()), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering, - }, + FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema()) + .with_file(PartitionedFile::new("x".to_string(), 100)) + .with_output_ordering(output_ordering), None, None, Default::default(), @@ -1457,19 +1450,12 @@ pub(crate) mod tests { output_ordering: Vec>, ) -> Arc { Arc::new(ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), - file_schema: schema(), - file_groups: vec![ + FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema()) + .with_file_groups(vec![ vec![PartitionedFile::new("x".to_string(), 100)], vec![PartitionedFile::new("y".to_string(), 100)], - ], - statistics: Statistics::new_unknown(&schema()), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering, - }, + ]) + .with_output_ordering(output_ordering), None, None, Default::default(), @@ -1482,16 +1468,9 @@ pub(crate) mod tests { fn csv_exec_with_sort(output_ordering: Vec>) -> Arc { Arc::new(CsvExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), - file_schema: schema(), - file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], - statistics: Statistics::new_unknown(&schema()), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering, - }, + FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema()) + .with_file(PartitionedFile::new("x".to_string(), 100)) + .with_output_ordering(output_ordering), false, b',', b'"', @@ -1509,19 +1488,12 @@ pub(crate) mod tests { output_ordering: Vec>, ) -> Arc { Arc::new(CsvExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), - file_schema: schema(), - file_groups: vec![ + FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema()) + .with_file_groups(vec![ vec![PartitionedFile::new("x".to_string(), 100)], vec![PartitionedFile::new("y".to_string(), 100)], - ], - statistics: Statistics::new_unknown(&schema()), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering, - }, + ]) + .with_output_ordering(output_ordering), false, b',', b'"', @@ -3790,19 +3762,11 @@ pub(crate) mod tests { let plan = aggregate_exec_with_alias( Arc::new(CsvExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), - file_schema: schema(), - file_groups: vec![vec![PartitionedFile::new( - "x".to_string(), - 100, - )]], - statistics: Statistics::new_unknown(&schema()), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }, + FileScanConfig::new( + ObjectStoreUrl::parse("test:///").unwrap(), + schema(), + ) + .with_file(PartitionedFile::new("x".to_string(), 100)), false, b',', b'"', diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index fe1290e40774..a15b9d4fbc87 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -1297,7 +1297,7 @@ mod tests { use crate::physical_plan::joins::StreamJoinPartitionMode; use arrow_schema::{DataType, Field, Schema, SortOptions}; - use datafusion_common::{JoinType, ScalarValue, Statistics}; + use datafusion_common::{JoinType, ScalarValue}; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::{ @@ -1676,16 +1676,12 @@ mod tests { Field::new("e", DataType::Int32, true), ])); Arc::new(CsvExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), - file_schema: schema.clone(), - file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], - statistics: Statistics::new_unknown(&schema), - projection: Some(vec![0, 1, 2, 3, 4]), - limit: None, - table_partition_cols: vec![], - output_ordering: vec![vec![]], - }, + FileScanConfig::new( + ObjectStoreUrl::parse("test:///").unwrap(), + schema.clone(), + ) + .with_file(PartitionedFile::new("x".to_string(), 100)) + .with_projection(Some(vec![0, 1, 2, 3, 4])), false, 0, 0, @@ -1702,16 +1698,12 @@ mod tests { Field::new("d", DataType::Int32, true), ])); Arc::new(CsvExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), - file_schema: schema.clone(), - file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], - statistics: Statistics::new_unknown(&schema), - projection: Some(vec![3, 2, 1]), - limit: None, - table_partition_cols: vec![], - output_ordering: vec![vec![]], - }, + FileScanConfig::new( + ObjectStoreUrl::parse("test:///").unwrap(), + schema.clone(), + ) + .with_file(PartitionedFile::new("x".to_string(), 100)) + .with_projection(Some(vec![3, 2, 1])), false, 0, 0, diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index f69c0df32e8a..e3ef3b95aa06 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -291,7 +291,7 @@ mod tests { use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::tree_node::{TransformedResult, TreeNode}; - use datafusion_common::{Result, Statistics}; + use datafusion_common::Result; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_expr::{JoinType, Operator}; use datafusion_physical_expr::expressions::{self, col, Column}; @@ -1491,19 +1491,13 @@ mod tests { let projection: Vec = vec![0, 2, 3]; Arc::new(CsvExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), - file_schema: schema.clone(), - file_groups: vec![vec![PartitionedFile::new( - "file_path".to_string(), - 100, - )]], - statistics: Statistics::new_unknown(schema), - projection: Some(projection), - limit: None, - table_partition_cols: vec![], - output_ordering: vec![sort_exprs], - }, + FileScanConfig::new( + ObjectStoreUrl::parse("test:///").unwrap(), + schema.clone(), + ) + .with_file(PartitionedFile::new("file_path".to_string(), 100)) + .with_projection(Some(projection)) + .with_output_ordering(vec![sort_exprs]), true, 0, b'"', diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs b/datafusion/core/src/physical_optimizer/test_utils.rs index 7bc1eeb7c4a5..4d926847e465 100644 --- a/datafusion/core/src/physical_optimizer/test_utils.rs +++ b/datafusion/core/src/physical_optimizer/test_utils.rs @@ -41,7 +41,7 @@ use crate::prelude::{CsvReadOptions, SessionContext}; use arrow_schema::{Schema, SchemaRef, SortOptions}; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::{JoinType, Statistics}; +use datafusion_common::JoinType; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_expr::{AggregateFunction, WindowFrame, WindowFunctionDefinition}; use datafusion_physical_expr::expressions::col; @@ -275,16 +275,8 @@ pub fn sort_preserving_merge_exec( /// Create a non sorted parquet exec pub fn parquet_exec(schema: &SchemaRef) -> Arc { Arc::new(ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), - file_schema: schema.clone(), - file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], - statistics: Statistics::new_unknown(schema), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }, + FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema.clone()) + .with_file(PartitionedFile::new("x".to_string(), 100)), None, None, Default::default(), @@ -299,16 +291,9 @@ pub fn parquet_exec_sorted( let sort_exprs = sort_exprs.into_iter().collect(); Arc::new(ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), - file_schema: schema.clone(), - file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], - statistics: Statistics::new_unknown(schema), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![sort_exprs], - }, + FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema.clone()) + .with_file(PartitionedFile::new("x".to_string(), 100)) + .with_output_ordering(vec![sort_exprs]), None, None, Default::default(), diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index 1152c70d4391..b03aaabcad6b 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -91,7 +91,7 @@ pub fn scan_partitioned_csv(partitions: usize, work_dir: &Path) -> Result>, -) -> Result { - Ok(FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_schema: schema.clone(), - file_groups, - statistics: Statistics::new_unknown(&schema), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }) +) -> FileScanConfig { + FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema) + .with_file_groups(file_groups) } pub fn assert_fields_eq(plan: &LogicalPlan, expected: Vec<&str>) { @@ -283,16 +275,9 @@ pub fn csv_exec_sorted( let sort_exprs = sort_exprs.into_iter().collect(); Arc::new(CsvExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), - file_schema: schema.clone(), - file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], - statistics: Statistics::new_unknown(schema), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![sort_exprs], - }, + FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema.clone()) + .with_file(PartitionedFile::new("x".to_string(), 100)) + .with_output_ordering(vec![sort_exprs]), false, 0, 0, @@ -345,16 +330,9 @@ pub fn csv_exec_ordered( let sort_exprs = sort_exprs.into_iter().collect(); Arc::new(CsvExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), - file_schema: schema.clone(), - file_groups: vec![vec![PartitionedFile::new("file_path".to_string(), 100)]], - statistics: Statistics::new_unknown(schema), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![sort_exprs], - }, + FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema.clone()) + .with_file(PartitionedFile::new("file_path".to_string(), 100)) + .with_output_ordering(vec![sort_exprs]), true, 0, b'"', diff --git a/datafusion/core/src/test_util/parquet.rs b/datafusion/core/src/test_util/parquet.rs index 1d5668c7ec55..df1d2c6f0999 100644 --- a/datafusion/core/src/test_util/parquet.rs +++ b/datafusion/core/src/test_util/parquet.rs @@ -37,8 +37,6 @@ use crate::physical_plan::metrics::MetricsSet; use crate::physical_plan::ExecutionPlan; use crate::prelude::{Expr, SessionConfig, SessionContext}; -use datafusion_common::Statistics; - use object_store::path::Path; use object_store::ObjectMeta; use parquet::arrow::ArrowWriter; @@ -144,22 +142,15 @@ impl TestParquetFile { ctx: &SessionContext, maybe_filter: Option, ) -> Result> { - let scan_config = FileScanConfig { - object_store_url: self.object_store_url.clone(), - file_schema: self.schema.clone(), - file_groups: vec![vec![PartitionedFile { - object_meta: self.object_meta.clone(), - partition_values: vec![], - range: None, - statistics: None, - extensions: None, - }]], - statistics: Statistics::new_unknown(&self.schema), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }; + let scan_config = + FileScanConfig::new(self.object_store_url.clone(), self.schema.clone()) + .with_file(PartitionedFile { + object_meta: self.object_meta.clone(), + partition_values: vec![], + range: None, + statistics: None, + extensions: None, + }); let df_schema = self.schema.clone().to_dfschema_ref()?; diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index e4f4d229c416..4f50c55c627c 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -30,8 +30,8 @@ use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::{ FileMeta, FileScanConfig, ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory, }; +use datafusion::physical_plan::collect; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; -use datafusion::physical_plan::{collect, Statistics}; use datafusion::prelude::SessionContext; use datafusion_common::Result; @@ -63,7 +63,7 @@ async fn route_data_access_ops_to_parquet_file_reader_factory() { let file_schema = batch.schema().clone(); let (in_memory_object_store, parquet_files_meta) = store_parquet_in_memory(vec![batch]).await; - let file_groups = parquet_files_meta + let file_group = parquet_files_meta .into_iter() .map(|meta| PartitionedFile { object_meta: meta, @@ -76,17 +76,12 @@ async fn route_data_access_ops_to_parquet_file_reader_factory() { // prepare the scan let parquet_exec = ParquetExec::new( - FileScanConfig { + FileScanConfig::new( // just any url that doesn't point to in memory object store - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: vec![file_groups], - statistics: Statistics::new_unknown(&file_schema), + ObjectStoreUrl::local_filesystem(), file_schema, - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }, + ) + .with_file_group(file_group), None, None, Default::default(), diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index 8f42f21834cc..2e9cda40c330 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -27,7 +27,7 @@ use datafusion::execution::context::SessionState; use datafusion::physical_plan::metrics::MetricValue; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionContext; -use datafusion_common::{ScalarValue, Statistics, ToDFSchema}; +use datafusion_common::{ScalarValue, ToDFSchema}; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::{col, lit, Expr}; use datafusion_physical_expr::create_physical_expr; @@ -71,17 +71,7 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr) -> ParquetExec { let predicate = create_physical_expr(&filter, &df_schema, &execution_props).unwrap(); let parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url, - file_groups: vec![vec![partitioned_file]], - file_schema: schema.clone(), - statistics: Statistics::new_unknown(&schema), - // file has 10 cols so index 12 should be month - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }, + FileScanConfig::new(object_store_url, schema).with_file(partitioned_file), Some(predicate), None, Default::default(), diff --git a/datafusion/core/tests/parquet/schema_adapter.rs b/datafusion/core/tests/parquet/schema_adapter.rs index 10c4e8a4c059..ead2884e43c5 100644 --- a/datafusion/core/tests/parquet/schema_adapter.rs +++ b/datafusion/core/tests/parquet/schema_adapter.rs @@ -30,7 +30,7 @@ use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::{ FileScanConfig, ParquetExec, SchemaAdapter, SchemaAdapterFactory, SchemaMapper, }; -use datafusion::physical_plan::{collect, Statistics}; +use datafusion::physical_plan::collect; use datafusion::prelude::SessionContext; use datafusion::datasource::listing::PartitionedFile; @@ -83,16 +83,8 @@ async fn can_override_schema_adapter() { // prepare the scan let parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: vec![vec![partitioned_file]], - statistics: Statistics::new_unknown(&schema), - file_schema: schema, - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }, + FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema) + .with_file(partitioned_file), None, None, Default::default(), diff --git a/datafusion/core/tests/parquet/schema_coercion.rs b/datafusion/core/tests/parquet/schema_coercion.rs index 88f795d2a4fe..ac51b4f71201 100644 --- a/datafusion/core/tests/parquet/schema_coercion.rs +++ b/datafusion/core/tests/parquet/schema_coercion.rs @@ -26,7 +26,7 @@ use datafusion::assert_batches_sorted_eq; use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec}; use datafusion::physical_plan::collect; use datafusion::prelude::SessionContext; -use datafusion_common::{Result, Statistics}; +use datafusion_common::Result; use datafusion_execution::object_store::ObjectStoreUrl; use object_store::path::Path; @@ -51,7 +51,7 @@ async fn multi_parquet_coercion() { let batch2 = RecordBatch::try_from_iter(vec![("c2", c2), ("c3", c3)]).unwrap(); let (meta, _files) = store_parquet(vec![batch1, batch2]).await.unwrap(); - let file_groups = meta.into_iter().map(Into::into).collect(); + let file_group = meta.into_iter().map(Into::into).collect(); // cast c1 to utf8, c2 to int32, c3 to float64 let file_schema = Arc::new(Schema::new(vec![ @@ -60,16 +60,8 @@ async fn multi_parquet_coercion() { Field::new("c3", DataType::Float64, true), ])); let parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: vec![file_groups], - statistics: Statistics::new_unknown(&file_schema), - file_schema, - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }, + FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema) + .with_file_group(file_group), None, None, Default::default(), @@ -115,7 +107,7 @@ async fn multi_parquet_coercion_projection() { RecordBatch::try_from_iter(vec![("c2", c2), ("c1", c1s), ("c3", c3)]).unwrap(); let (meta, _files) = store_parquet(vec![batch1, batch2]).await.unwrap(); - let file_groups = meta.into_iter().map(Into::into).collect(); + let file_group = meta.into_iter().map(Into::into).collect(); // cast c1 to utf8, c2 to int32, c3 to float64 let file_schema = Arc::new(Schema::new(vec![ @@ -124,16 +116,9 @@ async fn multi_parquet_coercion_projection() { Field::new("c3", DataType::Float64, true), ])); let parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: vec![file_groups], - statistics: Statistics::new_unknown(&file_schema), - file_schema, - projection: Some(vec![1, 0, 2]), - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }, + FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema) + .with_file_group(file_group) + .with_projection(Some(vec![1, 0, 2])), None, None, Default::default(), diff --git a/datafusion/execution/src/object_store.rs b/datafusion/execution/src/object_store.rs index 126f83f7e238..7697c01d63f2 100644 --- a/datafusion/execution/src/object_store.rs +++ b/datafusion/execution/src/object_store.rs @@ -51,7 +51,14 @@ impl ObjectStoreUrl { Ok(Self { url: parsed }) } - /// An [`ObjectStoreUrl`] for the local filesystem + /// An [`ObjectStoreUrl`] for the local filesystem (`file://`) + /// + /// # Example + /// ``` + /// # use datafusion_execution::object_store::ObjectStoreUrl; + /// let local_fs = ObjectStoreUrl::parse("file://").unwrap(); + /// assert_eq!(local_fs, ObjectStoreUrl::local_filesystem()) + /// ``` pub fn local_filesystem() -> Self { Self::parse("file://").unwrap() } diff --git a/datafusion/substrait/src/physical_plan/consumer.rs b/datafusion/substrait/src/physical_plan/consumer.rs index 50b08e7793f0..68f8b02b0f09 100644 --- a/datafusion/substrait/src/physical_plan/consumer.rs +++ b/datafusion/substrait/src/physical_plan/consumer.rs @@ -24,7 +24,7 @@ use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec}; use datafusion::error::{DataFusionError, Result}; -use datafusion::physical_plan::{ExecutionPlan, Statistics}; +use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionContext; use async_recursion::async_recursion; @@ -104,16 +104,11 @@ pub async fn from_substrait_rel( file_groups[part_index].push(partitioned_file) } - let mut base_config = FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_schema: Arc::new(Schema::empty()), - file_groups, - statistics: Statistics::new_unknown(&Schema::empty()), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }; + let mut base_config = FileScanConfig::new( + ObjectStoreUrl::local_filesystem(), + Arc::new(Schema::empty()), + ) + .with_file_groups(file_groups); if let Some(MaskExpression { select, .. }) = &read.projection { if let Some(projection) = &select.as_ref() { diff --git a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs index 70887e393491..aca044319406 100644 --- a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs @@ -23,7 +23,7 @@ use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec}; use datafusion::error::Result; -use datafusion::physical_plan::{displayable, ExecutionPlan, Statistics}; +use datafusion::physical_plan::{displayable, ExecutionPlan}; use datafusion::prelude::SessionContext; use datafusion_substrait::physical_plan::{consumer, producer}; @@ -31,25 +31,20 @@ use substrait::proto::extensions; #[tokio::test] async fn parquet_exec() -> Result<()> { - let scan_config = FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_schema: Arc::new(Schema::empty()), - file_groups: vec![ - vec![PartitionedFile::new( - "file://foo/part-0.parquet".to_string(), - 123, - )], - vec![PartitionedFile::new( - "file://foo/part-1.parquet".to_string(), - 123, - )], - ], - statistics: Statistics::new_unknown(&Schema::empty()), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - }; + let scan_config = FileScanConfig::new( + ObjectStoreUrl::local_filesystem(), + Arc::new(Schema::empty()), + ) + .with_file_groups(vec![ + vec![PartitionedFile::new( + "file://foo/part-0.parquet".to_string(), + 123, + )], + vec![PartitionedFile::new( + "file://foo/part-1.parquet".to_string(), + 123, + )], + ]); let parquet_exec: Arc = Arc::new(ParquetExec::new( scan_config, None, From 4bbfb42d1718d14adba59c912b3934649cbbe2d6 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 23 May 2024 08:38:14 -0400 Subject: [PATCH 2/8] Remove add_* api --- .../physical_plan/file_scan_config.rs | 45 +++++++------------ 1 file changed, 15 insertions(+), 30 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index 16c5ce3a9ba6..f5d3c7a6410d 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -75,19 +75,17 @@ pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue { /// # let file_schema = Arc::new(Schema::empty()); /// // create FileScan config for reading data from file:// /// let object_store_url = ObjectStoreUrl::local_filesystem(); -/// let mut config = FileScanConfig::new(object_store_url, file_schema) +/// let config = FileScanConfig::new(object_store_url, file_schema) /// .with_limit(Some(1000)) // read only the first 1000 records -/// .with_projection(Some(vec![2, 3])); // project columns 2 and 3 -/// -/// // Read /tmp/file1.parquet with known size of 1234 bytes in a single group -/// config.add_file(PartitionedFile::new("file1.parquet", 1234)); -/// -/// // Read /tmp/file2.parquet 56 bytes and /tmp/file3.parquet 78 bytes -/// // in a single row group -/// config.add_file_group(vec![ +/// .with_projection(Some(vec![2, 3])) // project columns 2 and 3 +/// // Read /tmp/file1.parquet with known size of 1234 bytes in a single group +/// .with_file(PartitionedFile::new("file1.parquet", 1234)) +/// // Read /tmp/file2.parquet 56 bytes and /tmp/file3.parquet 78 bytes +/// // in a single row group +/// .with_file_group(vec![ /// PartitionedFile::new("file2.parquet", 56), /// PartitionedFile::new("file3.parquet", 78), -/// ]); +/// ]); /// ``` #[derive(Clone)] pub struct FileScanConfig { @@ -135,8 +133,10 @@ pub struct FileScanConfig { impl FileScanConfig { /// Create a new `FileScanConfig` with default settings for scanning files. /// - /// No file groups are added by default. See [`Self::add_file`] and - /// [`Self::add_file_group`] + /// See example on [`FileScanConfig`] + /// + /// No file groups are added by default. See [`Self::with_file`], [`Self::with_file_group]` and + /// [`Self::with_file_groups`]. /// /// # Parameters: /// * `object_store_url`: See [`Self::object_store_url`] @@ -155,20 +155,6 @@ impl FileScanConfig { } } - /// Add a new file as a single file group - /// - /// See [Self::file_groups] for more information - pub fn add_file(&mut self, file: PartitionedFile) { - self.add_file_group(vec![file]) - } - - /// Add a new file group - /// - /// See [Self::file_groups] for more information - pub fn add_file_group(&mut self, file_group: Vec) { - self.file_groups.push(file_group); - } - /// Set the statistics of the files pub fn with_statistics(mut self, statistics: Statistics) -> Self { self.statistics = statistics; @@ -190,9 +176,8 @@ impl FileScanConfig { /// Add a file as a single group /// /// See [Self::file_groups] for more information. - pub fn with_file(mut self, file: PartitionedFile) -> Self { - self.add_file(file); - self + pub fn with_file(self, file: PartitionedFile) -> Self { + self.with_file_group(vec![file]) } /// Add the file groups @@ -210,7 +195,7 @@ impl FileScanConfig { /// /// See [Self::file_groups] for more information pub fn with_file_group(mut self, file_group: Vec) -> Self { - self.add_file_group(file_group); + self.file_groups.push(file_group); self } From 1b4884decae2db609949044d9d120bdb350c21d3 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 23 May 2024 07:59:52 -0400 Subject: [PATCH 3/8] Improve ParquetExec and related documentation --- .../datasource/physical_plan/parquet/mod.rs | 106 ++++++++++++++++-- .../physical_plan/parquet/schema_adapter.rs | 24 ++-- 2 files changed, 112 insertions(+), 18 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 17cb6a66c705..f4d19ac92eba 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -75,7 +75,79 @@ pub use metrics::ParquetFileMetrics; pub use schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper}; pub use statistics::{RequestedStatistics, StatisticsConverter}; -/// Execution plan for scanning one or more Parquet partitions +/// Execution plan for reading one or more Parquet files. +/// +/// ```text +/// ▲ +/// │ +/// │ Produce a stream of +/// │ RecordBatches +/// │ +/// ┌───────────────────────┐ +/// │ │ +/// │ ParquetExec │ +/// │ │ +/// └───────────────────────┘ +/// ▲ +/// │ Asynchronously read from one +/// │ or more parquet files via +/// │ ObjectStore interface +/// │ +/// │ +/// .───────────────────. +/// │ ) +/// │`───────────────────'│ +/// │ ObjectStore │ +/// │.───────────────────.│ +/// │ ) +/// `───────────────────' +/// +/// ``` +/// # Features +/// +/// Supports the following optimizations: +/// +/// * Multi-threaded (aka multi-partition): read from one or more files in +/// parallel. Can read concurrently from multiple row groups from a single file. +/// +/// * Predicate push down: skips row groups and pages based on +/// min/max/null_counts in the row group metadata, the page index and bloom +/// filters. +/// +/// * Projection pushdown: reads and decodes only the columns required. +/// +/// * Limit pushdown: stop execution early after some number of rows are read. +/// +/// * Custom readers: controls I/O for accessing pages. See +/// [`ParquetFileReaderFactory`] for more details. +/// +/// * Schema adapters: read parquet files with different schemas into a unified +/// table schema. This can be used to implement "schema evolution". See +/// [`SchemaAdapterFactory`] for more details. +/// +/// * metadata_size_hint: controls the number of bytes read from the end of the +/// file in the initial I/O. +/// +/// # Execution Overview +/// +/// * Step 1: [`ParquetExec::execute`] is called, returning a [`FileStream`] +/// configured to open parquet files with a [`ParquetOpener`]. +/// +/// * Step 2: When the stream is polled, the [`ParquetOpener`] is called to open +/// the file. +/// +/// * Step 3: The `ParquetOpener` gets the file metadata by reading the footer, +/// and applies any predicates and projections to determine what pages must be +/// read. +/// +/// * Step 4: The stream begins reading data, fetching the required pages +/// and incrementally decoding them. +/// +/// * Step 5: As each [`RecordBatch]` is read, it may be adapted by a +/// [`SchemaAdapter`] to match the table schema. By default missing columns are +/// filled with nulls, but this can be customized via [`SchemaAdapterFactory`]. +/// +/// [`RecordBatch`]: arrow::record_batch::RecordBatch #[derive(Debug, Clone)] pub struct ParquetExec { /// Base configuration for this scan @@ -85,9 +157,9 @@ pub struct ParquetExec { metrics: ExecutionPlanMetricsSet, /// Optional predicate for row filtering during parquet scan predicate: Option>, - /// Optional predicate for pruning row groups + /// Optional predicate for pruning row groups (derived from `predicate`) pruning_predicate: Option>, - /// Optional predicate for pruning pages + /// Optional predicate for pruning pages (derived from `predicate`) page_pruning_predicate: Option>, /// Optional hint for the size of the parquet metadata metadata_size_hint: Option, @@ -642,11 +714,22 @@ fn should_enable_page_index( .unwrap_or(false) } -/// Factory of parquet file readers. +/// Interface for creating [`AsyncFileReader`]s to read parquet files. +/// +/// This interface is used by [`ParquetOpener`] in order to create readers for +/// parquet files. Implementations of this trait can be used to provide custom +/// data access operations such as pre-cached data, I/O coalescing, etc. /// -/// Provides means to implement custom data access interface. +/// [`DefaultParquetFileReaderFactory`] by default returns a +/// [`ParquetObjectReader`]. pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static { - /// Provides `AsyncFileReader` over parquet file specified in `FileMeta` + /// Provides an `AsyncFileReader` for reading data from a parquet file specified + /// + /// # Arguments + /// * partition_index - Index of the partition (for reporting metrics) + /// * file_meta - The file to be read + /// * metadata_size_hint - If specified, the first IO reads this many bytes from the footer + /// * metrics - Execution metrics fn create_reader( &self, partition_index: usize, @@ -663,13 +746,20 @@ pub struct DefaultParquetFileReaderFactory { } impl DefaultParquetFileReaderFactory { - /// Create a factory. + /// Create a new `DefaultParquetFileReaderFactory`. pub fn new(store: Arc) -> Self { Self { store } } } -/// Implements [`AsyncFileReader`] for a parquet file in object storage +/// Implements [`AsyncFileReader`] for a parquet file in object storage. +/// +/// This implementation uses the [`ParquetObjectReader`] to read data from the +/// object store on demand, as required, tracking the number of bytes read. +/// +/// This implementation does not coalesce I/O operations or cache bytes. Such +/// optimizations can be done either at the object store level or by providing a +/// custom implementation of [`ParquetFileReaderFactory`]. pub(crate) struct ParquetFileReader { file_metrics: ParquetFileMetrics, inner: ParquetObjectReader, diff --git a/datafusion/core/src/datasource/physical_plan/parquet/schema_adapter.rs b/datafusion/core/src/datasource/physical_plan/parquet/schema_adapter.rs index 193e5161a398..ac053214fdfb 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/schema_adapter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/schema_adapter.rs @@ -20,27 +20,29 @@ use arrow_schema::{Schema, SchemaRef}; use std::fmt::Debug; use std::sync::Arc; -/// Factory of schema adapters. +/// Factory for creating [`SchemaAdapter`] /// -/// Provides means to implement custom schema adaptation. +/// This interface provides a way to implement custom schema adaptation logic +/// for ParquetExec (for example, to fill missing columns with default value +/// other than null) pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static { /// Provides `SchemaAdapter` for the ParquetExec. fn create(&self, schema: SchemaRef) -> Box; } -/// A utility which can adapt file-level record batches to a table schema which may have a schema +/// Adapt file-level [`RecordBatch`]es to a table schema, which may have a schema /// obtained from merging multiple file-level schemas. /// /// This is useful for enabling schema evolution in partitioned datasets. /// /// This has to be done in two stages. /// -/// 1. Before reading the file, we have to map projected column indexes from the table schema to -/// the file schema. +/// 1. Before reading the file, we have to map projected column indexes from the +/// table schema to the file schema. /// -/// 2. After reading a record batch we need to map the read columns back to the expected columns -/// indexes and insert null-valued columns wherever the file schema was missing a colum present -/// in the table schema. +/// 2. After reading a record batch map the read columns back to the expected +/// columns indexes and insert null-valued columns wherever the file schema was +/// missing a colum present in the table schema. pub trait SchemaAdapter: Send + Sync { /// Map a column index in the table schema to a column index in a particular /// file schema @@ -48,7 +50,8 @@ pub trait SchemaAdapter: Send + Sync { /// Panics if index is not in range for the table schema fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option; - /// Creates a `SchemaMapping` that can be used to cast or map the columns from the file schema to the table schema. + /// Creates a `SchemaMapping` that can be used to cast or map the columns + /// from the file schema to the table schema. /// /// If the provided `file_schema` contains columns of a different type to the expected /// `table_schema`, the method will attempt to cast the array data from the file schema @@ -62,7 +65,8 @@ pub trait SchemaAdapter: Send + Sync { ) -> datafusion_common::Result<(Arc, Vec)>; } -/// Transforms a RecordBatch from Parquet to a RecordBatch that meets the table schema. +/// Transforms a [`RecordBatch`] read from a Parquet file to a [`RecordBatch`] +/// that has the the table schema. pub trait SchemaMapper: Send + Sync { /// Adapts a `RecordBatch` to match the `table_schema` using the stored mapping and conversions. fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result; From 09f14c25262c830541878eb09f95b222b656ccf5 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 23 May 2024 07:59:52 -0400 Subject: [PATCH 4/8] Add ParquetExec::builder() API --- .../datasource/physical_plan/parquet/mod.rs | 198 +++++++++++++++--- 1 file changed, 167 insertions(+), 31 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index f4d19ac92eba..8a953aafeb1b 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -103,6 +103,27 @@ pub use statistics::{RequestedStatistics, StatisticsConverter}; /// `───────────────────' /// /// ``` +/// +/// # Example: Create a `ParquetExec` +/// ``` +/// # use std::sync::Arc; +/// # use arrow::datatypes::Schema; +/// # use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec}; +/// # use datafusion::datasource::listing::PartitionedFile; +/// # let file_schema = Arc::new(Schema::empty()); +/// # let object_store_url = ObjectStoreUrl::local_filesystem(); +/// # use datafusion_execution::object_store::ObjectStoreUrl; +/// # use datafusion_physical_expr::expressions::lit; +/// # let predicate = lit(true); +/// // Create a ParquetExec for reading `file1.parquet` with a file size of 100MB +/// let file_scan_config = FileScanConfig::new(object_store_url, file_schema) +/// .with_file(PartitionedFile::new("file1.parquet", 100*1024*1024)); +/// let exec = ParquetExec::builder(file_scan_config) +/// // Provide a predicate for filtering row groups/pages +/// .with_predicate(predicate) +/// .build(); +/// ``` +/// /// # Features /// /// Supports the following optimizations: @@ -118,36 +139,32 @@ pub use statistics::{RequestedStatistics, StatisticsConverter}; /// /// * Limit pushdown: stop execution early after some number of rows are read. /// -/// * Custom readers: controls I/O for accessing pages. See -/// [`ParquetFileReaderFactory`] for more details. +/// * Custom readers: controls I/O for accessing pages, and reading +/// [`ParquetMetadata`]. This can be used to implement custom IO scheduling, and +/// re-using parsed metadata. See [`ParquetFileReaderFactory`] for more details. /// -/// * Schema adapters: read parquet files with different schemas into a unified +/// * Schema adapters:read parquet files with different schemas into a unified /// table schema. This can be used to implement "schema evolution". See /// [`SchemaAdapterFactory`] for more details. /// /// * metadata_size_hint: controls the number of bytes read from the end of the -/// file in the initial I/O. +/// file in the initial I/O. See [`ParquetExecBuilder::with_metadata_size_hint`] +/// for details. /// /// # Execution Overview /// -/// * Step 1: [`ParquetExec::execute`] is called, returning a [`FileStream`] +/// * Step 1: `ParquetExec::execute` is called, returning a [`FileStream`] /// configured to open parquet files with a [`ParquetOpener`]. /// -/// * Step 2: When the stream is polled, the [`ParquetOpener`] is called to open +/// * Step 2: When the stream is polled, the `ParquetOpener` is called to open /// the file. /// /// * Step 3: The `ParquetOpener` gets the file metadata by reading the footer, /// and applies any predicates and projections to determine what pages must be /// read. /// -/// * Step 4: The stream begins reading data, fetching the required pages +/// * Step 4: The stream begins reading data, by fetching the required pages /// and incrementally decoding them. -/// -/// * Step 5: As each [`RecordBatch]` is read, it may be adapted by a -/// [`SchemaAdapter`] to match the table schema. By default missing columns are -/// filled with nulls, but this can be customized via [`SchemaAdapterFactory`]. -/// -/// [`RecordBatch`]: arrow::record_batch::RecordBatch #[derive(Debug, Clone)] pub struct ParquetExec { /// Base configuration for this scan @@ -173,14 +190,110 @@ pub struct ParquetExec { schema_adapter_factory: Option>, } -impl ParquetExec { - /// Create a new Parquet reader execution plan provided file list and schema. - pub fn new( - base_config: FileScanConfig, - predicate: Option>, - metadata_size_hint: Option, +/// [`ParquetExecBuilder`]`, builder for [`ParquetExec`]. +/// +/// See example on [`ParquetExec`]. +pub struct ParquetExecBuilder { + file_scan_config: FileScanConfig, + predicate: Option>, + metadata_size_hint: Option, + table_parquet_options: TableParquetOptions, + parquet_file_reader_factory: Option>, + schema_adapter_factory: Option>, +} + +impl ParquetExecBuilder { + /// Create a new builder to read the provided file scan configuration + pub fn new(file_scan_config: FileScanConfig) -> Self { + Self::new_with_options(file_scan_config, TableParquetOptions::default()) + } + + /// Create a new builder to read the data specified in the file scan + /// configuration with the provided table parquet options. + pub fn new_with_options( + file_scan_config: FileScanConfig, table_parquet_options: TableParquetOptions, ) -> Self { + Self { + file_scan_config, + predicate: None, + metadata_size_hint: None, + table_parquet_options, + parquet_file_reader_factory: None, + schema_adapter_factory: None, + } + } + + /// Set the predicate for the scan. + /// + /// The ParquetExec uses this predicate to filter row groups and data pages + /// using the Parquet statistics and bloom filters. + pub fn with_predicate(mut self, predicate: Arc) -> Self { + self.predicate = Some(predicate); + self + } + + /// Set the metadata size hint + /// + /// This value determines how many bytes at the end of the file the + /// ParquetExec will request in the initial IO. If this is too small, the + /// ParquetExec will need to make additional IO requests to read the footer. + pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self { + self.metadata_size_hint = Some(metadata_size_hint); + self + } + + /// Set the table parquet options that control how the ParquetExec reads. + /// + /// See also [`Self::new_with_options`] + pub fn with_table_parquet_options( + mut self, + table_parquet_options: TableParquetOptions, + ) -> Self { + self.table_parquet_options = table_parquet_options; + self + } + + /// Set optional user defined parquet file reader factory. + /// + /// `ParquetFileReaderFactory` complements `TableProvider`, It enables users to provide custom + /// implementation for data access operations. + /// + /// If custom `ParquetFileReaderFactory` is provided, then data access operations will be routed + /// to this factory instead of `ObjectStore`. + pub fn with_parquet_file_reader_factory( + mut self, + parquet_file_reader_factory: Arc, + ) -> Self { + self.parquet_file_reader_factory = Some(parquet_file_reader_factory); + self + } + + /// Set optional schema adapter factory. + /// + /// `SchemaAdapterFactory` allows user to specify how fields from the parquet file get mapped to + /// that of the table schema. The default schema adapter uses arrow's cast library to map + /// the parquet fields to the table schema. + pub fn with_schema_adapter_factory( + mut self, + schema_adapter_factory: Arc, + ) -> Self { + self.schema_adapter_factory = Some(schema_adapter_factory); + self + } + + /// Build a [`ParquetExec`] + pub fn build(self) -> ParquetExec { + let Self { + file_scan_config, + predicate, + metadata_size_hint, + table_parquet_options, + parquet_file_reader_factory, + schema_adapter_factory, + } = self; + + let base_config = file_scan_config; debug!("Creating ParquetExec, files: {:?}, projection {:?}, predicate: {:?}, limit: {:?}", base_config.file_groups, base_config.projection, predicate, base_config.limit); @@ -219,12 +332,12 @@ impl ParquetExec { let (projected_schema, projected_statistics, projected_output_ordering) = base_config.project(); - let cache = Self::compute_properties( + let cache = ParquetExec::compute_properties( projected_schema, &projected_output_ordering, &base_config, ); - Self { + ParquetExec { base_config, projected_statistics, metrics, @@ -232,11 +345,40 @@ impl ParquetExec { pruning_predicate, page_pruning_predicate, metadata_size_hint, - parquet_file_reader_factory: None, + parquet_file_reader_factory, cache, table_parquet_options, - schema_adapter_factory: None, + schema_adapter_factory, + } + } +} + +impl ParquetExec { + /// Create a new Parquet reader execution plan provided file list and schema. + //#[deprecated(since = "39.0.0", note = "use builder instead")] + pub fn new( + base_config: FileScanConfig, + predicate: Option>, + metadata_size_hint: Option, + table_parquet_options: TableParquetOptions, + ) -> Self { + let mut builder = + ParquetExecBuilder::new_with_options(base_config, table_parquet_options); + if let Some(predicate) = predicate { + builder = builder.with_predicate(predicate); + } + if let Some(metadata_size_hint) = metadata_size_hint { + builder = builder.with_metadata_size_hint(metadata_size_hint); } + builder.build() + } + + /// Return a [`ParquetExecBuilder`]. + /// + /// See example on [`ParquetExec`] and [`ParquetExecBuilder`] for specifying + /// parquet table options. + pub fn builder(file_scan_config: FileScanConfig) -> ParquetExecBuilder { + ParquetExecBuilder::new(file_scan_config) } /// [`FileScanConfig`] that controls this scan (such as which files to read) @@ -261,11 +403,7 @@ impl ParquetExec { /// Optional user defined parquet file reader factory. /// - /// `ParquetFileReaderFactory` complements `TableProvider`, It enables users to provide custom - /// implementation for data access operations. - /// - /// If custom `ParquetFileReaderFactory` is provided, then data access operations will be routed - /// to this factory instead of `ObjectStore`. + /// See documentation on [`ParquetExecBuilder::with_parquet_file_reader_factory`] pub fn with_parquet_file_reader_factory( mut self, parquet_file_reader_factory: Arc, @@ -276,9 +414,7 @@ impl ParquetExec { /// Optional schema adapter factory. /// - /// `SchemaAdapterFactory` allows user to specify how fields from the parquet file get mapped to - /// that of the table schema. The default schema adapter uses arrow's cast library to map - /// the parquet fields to the table schema. + /// See documentation on [`ParquetExecBuilder::with_schema_adapter_factory`] pub fn with_schema_adapter_factory( mut self, schema_adapter_factory: Arc, From 313252e3c57f6fc525462b9a438043325c4166a5 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 24 May 2024 05:33:32 -0400 Subject: [PATCH 5/8] Deprecate ParquetExec::new and update code --- .../src/datasource/file_format/parquet.rs | 23 ++++++---- .../datasource/physical_plan/parquet/mod.rs | 44 ++++++++++--------- .../combine_partial_final_agg.rs | 8 ++-- .../enforce_distribution.rs | 16 +++---- .../core/src/physical_optimizer/test_utils.rs | 16 +++---- datafusion/core/src/test_util/parquet.rs | 22 +++++----- .../core/tests/parquet/custom_reader.rs | 6 +-- datafusion/core/tests/parquet/page_pruning.rs | 11 +++-- .../core/tests/parquet/schema_adapter.rs | 6 +-- .../core/tests/parquet/schema_coercion.rs | 16 +++---- datafusion/proto/src/physical_plan/mod.rs | 11 +++-- .../tests/cases/roundtrip_physical_plan.rs | 18 +++----- .../substrait/src/physical_plan/consumer.rs | 8 +--- .../tests/cases/roundtrip_physical_plan.rs | 8 +--- 14 files changed, 93 insertions(+), 120 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 7fcd41049cb4..47fe6361313f 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -31,8 +31,7 @@ use crate::arrow::array::{ use crate::arrow::datatypes::{DataType, Fields, Schema, SchemaRef}; use crate::datasource::file_format::file_compression_type::FileCompressionType; use crate::datasource::physical_plan::{ - DefaultSchemaAdapterFactory, FileGroupDisplay, FileSinkConfig, ParquetExec, - SchemaAdapterFactory, + DefaultSchemaAdapterFactory, FileGroupDisplay, FileSinkConfig, SchemaAdapterFactory, }; use crate::datasource::statistics::{create_max_min_accs, get_col_stats}; use crate::error::Result; @@ -75,6 +74,7 @@ use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::task::JoinSet; +use crate::datasource::physical_plan::parquet::ParquetExecBuilder; use futures::{StreamExt, TryStreamExt}; use hashbrown::HashMap; use object_store::path::Path; @@ -253,17 +253,22 @@ impl FileFormat for ParquetFormat { conf: FileScanConfig, filters: Option<&Arc>, ) -> Result> { + let mut builder = + ParquetExecBuilder::new_with_options(conf, self.options.clone()); + // If enable pruning then combine the filters to build the predicate. // If disable pruning then set the predicate to None, thus readers // will not prune data based on the statistics. - let predicate = self.enable_pruning().then(|| filters.cloned()).flatten(); + if self.enable_pruning() { + if let Some(predicate) = filters.cloned() { + builder = builder.with_predicate(predicate); + } + } + if let Some(metadata_size_hint) = self.metadata_size_hint() { + builder = builder.with_metadata_size_hint(metadata_size_hint); + } - Ok(Arc::new(ParquetExec::new( - conf, - predicate, - self.metadata_size_hint(), - self.options.clone(), - ))) + Ok(builder.build_arc()) } async fn create_writer_physical_plan( diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 8a953aafeb1b..0fc56fcb8ac0 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -282,6 +282,11 @@ impl ParquetExecBuilder { self } + /// Convenience: build an `Arc`d `ParquetExec` from this builder + pub fn build_arc(self) -> Arc { + Arc::new(self.build()) + } + /// Build a [`ParquetExec`] pub fn build(self) -> ParquetExec { let Self { @@ -355,7 +360,10 @@ impl ParquetExecBuilder { impl ParquetExec { /// Create a new Parquet reader execution plan provided file list and schema. - //#[deprecated(since = "39.0.0", note = "use builder instead")] + #[deprecated( + since = "39.0.0", + note = "use `ParquetExec::builder` or `ParquetExecBuilder`" + )] pub fn new( base_config: FileScanConfig, predicate: Option>, @@ -1157,15 +1165,17 @@ mod tests { let predicate = predicate.map(|p| logical2physical(&p, &file_schema)); // prepare the scan - let mut parquet_exec = ParquetExec::new( + let mut builder = ParquetExec::builder( FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema) .with_file_group(file_group) .with_projection(projection), - predicate, - None, - Default::default(), ); + if let Some(predicate) = predicate { + builder = builder.with_predicate(predicate); + } + let mut parquet_exec = builder.build(); + if pushdown_predicate { parquet_exec = parquet_exec .with_pushdown_filters(true) @@ -1808,13 +1818,11 @@ mod tests { expected_row_num: Option, file_schema: SchemaRef, ) -> Result<()> { - let parquet_exec = ParquetExec::new( + let parquet_exec = ParquetExec::builder( FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema) .with_file_groups(file_groups), - None, - None, - Default::default(), - ); + ) + .build(); assert_eq!( parquet_exec .properties() @@ -1910,7 +1918,7 @@ mod tests { ), ]); - let parquet_exec = ParquetExec::new( + let parquet_exec = ParquetExec::builder( FileScanConfig::new(object_store_url, schema.clone()) .with_file(partitioned_file) // file has 10 cols so index 12 should be month and 13 should be day @@ -1927,10 +1935,8 @@ mod tests { false, ), ]), - None, - None, - Default::default(), - ); + ) + .build(); assert_eq!( parquet_exec.cache.output_partitioning().partition_count(), 1 @@ -1985,13 +1991,11 @@ mod tests { }; let file_schema = Arc::new(Schema::empty()); - let parquet_exec = ParquetExec::new( + let parquet_exec = ParquetExec::builder( FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema) .with_file(partitioned_file), - None, - None, - Default::default(), - ); + ) + .build(); let mut results = parquet_exec.execute(0, state.task_ctx())?; let batch = results.next().await.unwrap(); diff --git a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs index b93f4012b093..909c8acdb816 100644 --- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs +++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs @@ -245,16 +245,14 @@ mod tests { } fn parquet_exec(schema: &SchemaRef) -> Arc { - Arc::new(ParquetExec::new( + ParquetExec::builder( FileScanConfig::new( ObjectStoreUrl::parse("test:///").unwrap(), schema.clone(), ) .with_file(PartitionedFile::new("x".to_string(), 100)), - None, - None, - Default::default(), - )) + ) + .build_arc() } fn partial_aggregate_exec( diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 033cec53019d..9442ad7136b6 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1431,14 +1431,12 @@ pub(crate) mod tests { pub(crate) fn parquet_exec_with_sort( output_ordering: Vec>, ) -> Arc { - Arc::new(ParquetExec::new( + ParquetExec::builder( FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema()) .with_file(PartitionedFile::new("x".to_string(), 100)) .with_output_ordering(output_ordering), - None, - None, - Default::default(), - )) + ) + .build_arc() } fn parquet_exec_multiple() -> Arc { @@ -1449,17 +1447,15 @@ pub(crate) mod tests { fn parquet_exec_multiple_sorted( output_ordering: Vec>, ) -> Arc { - Arc::new(ParquetExec::new( + ParquetExec::builder( FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema()) .with_file_groups(vec![ vec![PartitionedFile::new("x".to_string(), 100)], vec![PartitionedFile::new("y".to_string(), 100)], ]) .with_output_ordering(output_ordering), - None, - None, - Default::default(), - )) + ) + .build_arc() } fn csv_exec() -> Arc { diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs b/datafusion/core/src/physical_optimizer/test_utils.rs index 4d926847e465..cfd0312f813d 100644 --- a/datafusion/core/src/physical_optimizer/test_utils.rs +++ b/datafusion/core/src/physical_optimizer/test_utils.rs @@ -274,13 +274,11 @@ pub fn sort_preserving_merge_exec( /// Create a non sorted parquet exec pub fn parquet_exec(schema: &SchemaRef) -> Arc { - Arc::new(ParquetExec::new( + ParquetExec::builder( FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema.clone()) .with_file(PartitionedFile::new("x".to_string(), 100)), - None, - None, - Default::default(), - )) + ) + .build_arc() } // Created a sorted parquet exec @@ -290,14 +288,12 @@ pub fn parquet_exec_sorted( ) -> Arc { let sort_exprs = sort_exprs.into_iter().collect(); - Arc::new(ParquetExec::new( + ParquetExec::builder( FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema.clone()) .with_file(PartitionedFile::new("x".to_string(), 100)) .with_output_ordering(vec![sort_exprs]), - None, - None, - Default::default(), - )) + ) + .build_arc() } pub fn union_exec(input: Vec>) -> Arc { diff --git a/datafusion/core/src/test_util/parquet.rs b/datafusion/core/src/test_util/parquet.rs index df1d2c6f0999..4d6616b9eefa 100644 --- a/datafusion/core/src/test_util/parquet.rs +++ b/datafusion/core/src/test_util/parquet.rs @@ -37,6 +37,7 @@ use crate::physical_plan::metrics::MetricsSet; use crate::physical_plan::ExecutionPlan; use crate::prelude::{Expr, SessionConfig, SessionContext}; +use crate::datasource::physical_plan::parquet::ParquetExecBuilder; use object_store::path::Path; use object_store::ObjectMeta; use parquet::arrow::ArrowWriter; @@ -163,22 +164,19 @@ impl TestParquetFile { let filter = simplifier.coerce(filter, &df_schema).unwrap(); let physical_filter_expr = create_physical_expr(&filter, &df_schema, &ExecutionProps::default())?; - let parquet_exec = Arc::new(ParquetExec::new( - scan_config, - Some(physical_filter_expr.clone()), - None, - parquet_options, - )); + + let parquet_exec = + ParquetExecBuilder::new_with_options(scan_config, parquet_options) + .with_predicate(physical_filter_expr.clone()) + .build_arc(); let exec = Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?); Ok(exec) } else { - Ok(Arc::new(ParquetExec::new( - scan_config, - None, - None, - parquet_options, - ))) + Ok( + ParquetExecBuilder::new_with_options(scan_config, parquet_options) + .build_arc(), + ) } } diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index 4f50c55c627c..0e515fd4647b 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -75,17 +75,15 @@ async fn route_data_access_ops_to_parquet_file_reader_factory() { .collect(); // prepare the scan - let parquet_exec = ParquetExec::new( + let parquet_exec = ParquetExec::builder( FileScanConfig::new( // just any url that doesn't point to in memory object store ObjectStoreUrl::local_filesystem(), file_schema, ) .with_file_group(file_group), - None, - None, - Default::default(), ) + .build() .with_parquet_file_reader_factory(Arc::new(InMemoryParquetFileReaderFactory( Arc::clone(&in_memory_object_store), ))); diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index 2e9cda40c330..15efd4bcd9dd 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -70,13 +70,12 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr) -> ParquetExec { let execution_props = ExecutionProps::new(); let predicate = create_physical_expr(&filter, &df_schema, &execution_props).unwrap(); - let parquet_exec = ParquetExec::new( + ParquetExec::builder( FileScanConfig::new(object_store_url, schema).with_file(partitioned_file), - Some(predicate), - None, - Default::default(), - ); - parquet_exec.with_enable_page_index(true) + ) + .with_predicate(predicate) + .build() + .with_enable_page_index(true) } #[tokio::test] diff --git a/datafusion/core/tests/parquet/schema_adapter.rs b/datafusion/core/tests/parquet/schema_adapter.rs index ead2884e43c5..d6e731711177 100644 --- a/datafusion/core/tests/parquet/schema_adapter.rs +++ b/datafusion/core/tests/parquet/schema_adapter.rs @@ -82,13 +82,11 @@ async fn can_override_schema_adapter() { let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()])); // prepare the scan - let parquet_exec = ParquetExec::new( + let parquet_exec = ParquetExec::builder( FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema) .with_file(partitioned_file), - None, - None, - Default::default(), ) + .build() .with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {})); let session_ctx = SessionContext::new(); diff --git a/datafusion/core/tests/parquet/schema_coercion.rs b/datafusion/core/tests/parquet/schema_coercion.rs index ac51b4f71201..af9411f40ecb 100644 --- a/datafusion/core/tests/parquet/schema_coercion.rs +++ b/datafusion/core/tests/parquet/schema_coercion.rs @@ -59,13 +59,11 @@ async fn multi_parquet_coercion() { Field::new("c2", DataType::Int32, true), Field::new("c3", DataType::Float64, true), ])); - let parquet_exec = ParquetExec::new( + let parquet_exec = ParquetExec::builder( FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema) .with_file_group(file_group), - None, - None, - Default::default(), - ); + ) + .build(); let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); @@ -115,14 +113,12 @@ async fn multi_parquet_coercion_projection() { Field::new("c2", DataType::Int32, true), Field::new("c3", DataType::Float64, true), ])); - let parquet_exec = ParquetExec::new( + let parquet_exec = ParquetExec::builder( FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema) .with_file_group(file_group) .with_projection(Some(vec![1, 0, 2])), - None, - None, - Default::default(), - ); + ) + .build(); let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 0515ed5006aa..4b65d63f6550 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -225,12 +225,11 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { ) }) .transpose()?; - Ok(Arc::new(ParquetExec::new( - base_config, - predicate, - None, - Default::default(), - ))) + let mut builder = ParquetExec::builder(base_config); + if let Some(predicate) = predicate { + builder = builder.with_predicate(predicate) + } + Ok(builder.build_arc()) } PhysicalPlanType::AvroScan(scan) => { Ok(Arc::new(AvroExec::new(parse_protobuf_file_scan_config( diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 79abecf556da..45715c792f21 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -555,12 +555,11 @@ fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> { Operator::Eq, lit("1"), )); - roundtrip_test(Arc::new(ParquetExec::new( - scan_config, - Some(predicate), - None, - Default::default(), - ))) + roundtrip_test( + ParquetExec::builder(scan_config) + .with_predicate(predicate) + .build_arc(), + ) } #[tokio::test] @@ -586,12 +585,7 @@ async fn roundtrip_parquet_exec_with_table_partition_cols() -> Result<()> { output_ordering: vec![], }; - roundtrip_test(Arc::new(ParquetExec::new( - scan_config, - None, - None, - Default::default(), - ))) + roundtrip_test(ParquetExec::builder(scan_config).build_arc()) } #[test] diff --git a/datafusion/substrait/src/physical_plan/consumer.rs b/datafusion/substrait/src/physical_plan/consumer.rs index 68f8b02b0f09..39b38c94ec18 100644 --- a/datafusion/substrait/src/physical_plan/consumer.rs +++ b/datafusion/substrait/src/physical_plan/consumer.rs @@ -121,12 +121,8 @@ pub async fn from_substrait_rel( } } - Ok(Arc::new(ParquetExec::new( - base_config, - None, - None, - Default::default(), - )) as Arc) + Ok(ParquetExec::builder(base_config).build_arc() + as Arc) } _ => not_impl_err!( "Only LocalFile reads are supported when parsing physical" diff --git a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs index aca044319406..4014670a7cbc 100644 --- a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs @@ -45,12 +45,8 @@ async fn parquet_exec() -> Result<()> { 123, )], ]); - let parquet_exec: Arc = Arc::new(ParquetExec::new( - scan_config, - None, - None, - Default::default(), - )); + let parquet_exec: Arc = + ParquetExec::builder(scan_config).build_arc(); let mut extension_info: ( Vec, From c6a0dab37d64c6502d6e309fd620937ea44d8836 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 24 May 2024 05:52:26 -0400 Subject: [PATCH 6/8] fix docs --- datafusion/core/src/datasource/physical_plan/parquet/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 0fc56fcb8ac0..38de917df09c 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -165,6 +165,8 @@ pub use statistics::{RequestedStatistics, StatisticsConverter}; /// /// * Step 4: The stream begins reading data, by fetching the required pages /// and incrementally decoding them. +/// +/// [`ParquetMetadata`]: parquet::file::metadata::ParquetMetaData #[derive(Debug, Clone)] pub struct ParquetExec { /// Base configuration for this scan From 3512028dd7f46b70f45253c4ac0156490e830815 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 16 May 2024 09:57:32 -0400 Subject: [PATCH 7/8] Add example for building an external index for parquet filtes --- datafusion-examples/README.md | 1 + datafusion-examples/examples/parquet_index.rs | 727 ++++++++++++++++++ 2 files changed, 728 insertions(+) create mode 100644 datafusion-examples/examples/parquet_index.rs diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md index 778950cbf926..a5395ea7aab3 100644 --- a/datafusion-examples/README.md +++ b/datafusion-examples/README.md @@ -60,6 +60,7 @@ cargo run --example csv_sql - [`function_factory.rs`](examples/function_factory.rs): Register `CREATE FUNCTION` handler to implement SQL macros - [`make_date.rs`](examples/make_date.rs): Examples of using the make_date function - [`memtable.rs`](examples/memtable.rs): Create an query data in memory using SQL and `RecordBatch`es +- ['parquet_index.rs'](examples/parquet_index.rs): Create an secondary index over several parquet files and use it to speed up queries - [`parquet_sql.rs`](examples/parquet_sql.rs): Build and run a query plan from a SQL statement against a local Parquet file - [`parquet_sql_multiple_files.rs`](examples/parquet_sql_multiple_files.rs): Build and run a query plan from a SQL statement against multiple local Parquet files - ['parquet_exec_visitor.rs'](examples/parquet_exec_visitor.rs): Extract statistics by visiting an ExecutionPlan after execution diff --git a/datafusion-examples/examples/parquet_index.rs b/datafusion-examples/examples/parquet_index.rs new file mode 100644 index 000000000000..0ed337611389 --- /dev/null +++ b/datafusion-examples/examples/parquet_index.rs @@ -0,0 +1,727 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ + Array, ArrayRef, AsArray, BooleanArray, Int32Array, RecordBatch, StringArray, + UInt64Array, +}; +use arrow::datatypes::Int32Type; +use arrow::util::pretty::pretty_format_batches; +use arrow_schema::SchemaRef; +use async_trait::async_trait; +use datafusion::datasource::listing::PartitionedFile; +use datafusion::datasource::physical_plan::parquet::{ + RequestedStatistics, StatisticsConverter, +}; +use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec}; +use datafusion::datasource::TableProvider; +use datafusion::execution::context::SessionState; +use datafusion::execution::object_store::ObjectStoreUrl; +use datafusion::parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; +use datafusion::parquet::arrow::ArrowWriter; +use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::*; +use datafusion_common::config::TableParquetOptions; +use datafusion_common::{ + internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue, Statistics, +}; +use datafusion_expr::utils::conjunction; +use datafusion_expr::{TableProviderFilterPushDown, TableType}; +use datafusion_physical_expr::PhysicalExpr; +use std::any::Any; +use std::collections::HashSet; +use std::fmt::Display; +use std::fs; +use std::fs::{DirEntry, File}; +use std::ops::Range; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use tempfile::TempDir; +use url::Url; + +/// This example demonstrates building a secondary index over multiple Parquet +/// files and using that index during query to skip ("prune") files that do not +/// contain relevant data. +/// +/// This example rules out relevant data using min/max values of a column +/// extracted from the Parquet metadata. In a real system, the index could be +/// more sophisticated, e.g. using inverted indices, bloom filters or other +/// techniques. +/// +/// Note this is a low level example for people who want to build their own +/// custom indexes. To read a directory of parquet files as a table, you can use +/// a higher level API such as [`SessionContext::read_parquet`] or +/// [`ListingTable`], which also do file pruning based on parquet statistics +/// (using the same underlying APIs) +/// +/// For a more advanced example of using an index to prune row groups within a +/// file, see the (forthcoming) `advanced_parquet_index` example. +/// +/// # Diagram +/// +/// ```text +/// ┏━━━━━━━━━━━━━━━━━━━━━━━━┓ +/// ┃ Index ┃ +/// ┃ ┃ +/// step 1: predicate is ┌ ─ ─ ─ ─▶┃ (sometimes referred to ┃ +/// evaluated against ┃ as a "catalog" or ┃ +/// data in the index │ ┃ "metastore") ┃ +/// (using ┗━━━━━━━━━━━━━━━━━━━━━━━━┛ +/// PruningPredicate) │ │ +/// +/// │ │ +/// ┌──────────────┐ +/// │ value = 150 │─ ─ ─ ─ ┘ │ +/// └──────────────┘ ┌─────────────┐ +/// Predicate from query │ │ │ +/// └─────────────┘ +/// │ ┌─────────────┐ +/// step 2: Index returns only ─ ▶│ │ +/// parquet files that might have └─────────────┘ +/// matching data. ... +/// ┌─────────────┐ +/// Thus some parquet files are │ │ +/// "pruned" and thus are not └─────────────┘ +/// scanned at all Parquet Files +/// +/// ``` +/// +/// [`ListingTable`]: datafusion::datasource::listing::ListingTable +#[tokio::main] +async fn main() -> Result<()> { + // Demo data has three files, each with schema + // * file_name (string) + // * value (int32) + // + // The files are as follows: + // * file1.parquet (value: 0..100) + // * file2.parquet (value: 100..200) + // * file3.parquet (value: 200..3000) + let data = DemoData::try_new()?; + + // Create a table provider with and our special index. + let provider = Arc::new(IndexTableProvider::try_new(data.path())?); + println!("** Table Provider:"); + println!("{provider}\n"); + + // Create a SessionContext for running queries that has the table provider + // registered as "index_table" + let ctx = SessionContext::new(); + ctx.register_table("index_table", Arc::clone(&provider) as _)?; + + // register object store provider for urls like `file://` work + let url = Url::try_from("file://").unwrap(); + let object_store = object_store::local::LocalFileSystem::new(); + ctx.runtime_env() + .register_object_store(&url, Arc::new(object_store)); + + // Select data from the table without any predicates (and thus no pruning) + println!("** Select data, no predicates:"); + ctx.sql("SELECT file_name, value FROM index_table LIMIT 10") + .await? + .show() + .await?; + println!("Files pruned: {}\n", provider.index().last_num_pruned()); + + // Run a query that uses the index to prune files. + // + // Using the predicate "value = 150", the IndexTable can skip reading file 1 + // (max value 100) and file 3 (min value of 200) + println!("** Select data, predicate `value = 150`"); + ctx.sql("SELECT file_name, value FROM index_table WHERE value = 150") + .await? + .show() + .await?; + println!("Files pruned: {}\n", provider.index().last_num_pruned()); + + // likewise, we can use a more complicated predicate like + // "value < 20 OR value > 500" to read only file 1 and file 3 + println!("** Select data, predicate `value < 20 OR value > 500`"); + ctx.sql( + "SELECT file_name, count(value) FROM index_table \ + WHERE value < 20 OR value > 500 GROUP BY file_name", + ) + .await? + .show() + .await?; + println!("Files pruned: {}\n", provider.index().last_num_pruned()); + + Ok(()) +} + +/// DataFusion `TableProvider` that uses [`IndexTableProvider`], a secondary +/// index to decide which Parquet files to read. +#[derive(Debug)] +pub struct IndexTableProvider { + /// The index of the parquet files in the directory + index: ParquetMetadataIndex, + /// the directory in which the files are stored + dir: PathBuf, +} + +impl Display for IndexTableProvider { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!(f, "IndexTableProvider")?; + writeln!(f, "---- Index ----")?; + write!(f, "{}", self.index) + } +} + +impl IndexTableProvider { + /// Create a new IndexTableProvider + pub fn try_new(dir: impl Into) -> Result { + let dir = dir.into(); + + // Create an index of the parquet files in the directory as we see them. + let mut index_builder = ParquetMetadataIndexBuilder::new(); + + let files = read_dir(&dir)?; + for file in &files { + index_builder.add_file(&file.path())?; + } + + let index = index_builder.build()?; + + Ok(Self { index, dir }) + } + + /// return a reference to the underlying index + fn index(&self) -> &ParquetMetadataIndex { + &self.index + } +} + +#[async_trait] +impl TableProvider for IndexTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.index.schema().clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + state: &SessionState, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> Result> { + let df_schema = DFSchema::try_from(self.schema())?; + // convert filters like [`a = 1`, `b = 2`] to a single filter like `a = 1 AND b = 2` + let predicate = conjunction(filters.to_vec()); + let predicate = predicate + .map(|predicate| state.create_physical_expr(predicate, &df_schema)) + .transpose()? + // if there are no filters, use a literal true to have a predicate + // that always evaluates to true we can pass to the index + .unwrap_or_else(|| datafusion_physical_expr::expressions::lit(true)); + + // Use the index to find the files that might have data that matches the + // predicate. Any file that can not have data that matches the predicate + // will not be returned. + let files = self.index.get_files(predicate.clone())?; + + // Transform to the format needed to pass to ParquetExec + // Create one file group per file (default to scanning them all in parallel) + let file_groups = files + .into_iter() + .map(|(file_name, file_size)| { + let path = self.dir.join(file_name); + let canonical_path = fs::canonicalize(path)?; + Ok(vec![PartitionedFile::new( + canonical_path.display().to_string(), + file_size, + )]) + }) + .collect::>>()?; + + // for now, simply use ParquetExec + // TODO make a builder for FileScanConfig + let object_store_url = ObjectStoreUrl::parse("file://")?; + let base_config = FileScanConfig { + object_store_url, + file_schema: self.schema(), + file_groups, + statistics: Statistics::new_unknown(self.index.schema()), + projection: projection.cloned(), + limit, + table_partition_cols: vec![], + output_ordering: vec![], + }; + + let metadata_size_hint = None; + + let table_parquet_options = TableParquetOptions::default(); + + // TODO make a builder for parquet exec + let exec = ParquetExec::new( + base_config, + Some(predicate), + metadata_size_hint, + table_parquet_options, + ); + + Ok(Arc::new(exec)) + } + + /// Tell DataFusion to push filters down to the scan method + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> Result> { + // Inexact because the pruning can't handle all expressions and pruning + // is not done at the row level -- there may be rows in returned files + // that do not pass the filter + Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()]) + } +} + +/// Simple in memory secondary index for a set of parquet files +/// +/// The index is represented as an arrow [`RecordBatch`] that can be passed +/// directly by the DataFusion [`PruningPredicate`] API +/// +/// The `RecordBatch` looks as follows. +/// +/// ```text +/// +---------------+-----------+-----------+------------------+------------------+ +/// | file_name | file_size | row_count | value_column_min | value_column_max | +/// +---------------+-----------+-----------+------------------+------------------+ +/// | file1.parquet | 6062 | 100 | 0 | 99 | +/// | file2.parquet | 6062 | 100 | 100 | 199 | +/// | file3.parquet | 163310 | 2800 | 200 | 2999 | +/// +---------------+-----------+-----------+------------------+------------------+ +/// ``` +/// +/// It must store file_name and file_size to construct `PartitionedFile`. +/// +/// Note a more advanced index might store finer grained information, such as information +/// about each row group within a file +#[derive(Debug)] +struct ParquetMetadataIndex { + file_schema: SchemaRef, + /// The index of the parquet files. See the struct level documentation for + /// the schema of this index. + index: RecordBatch, + /// The number of files that were pruned in the last query + last_num_pruned: AtomicUsize, +} + +impl Display for ParquetMetadataIndex { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!( + f, + "ParquetMetadataIndex(last_num_pruned: {})", + self.last_num_pruned() + )?; + let batches = pretty_format_batches(&[self.index.clone()]).unwrap(); + write!(f, "{batches}",) + } +} + +impl ParquetMetadataIndex { + /// the schema of the *files* in the index (not the index's schema) + fn schema(&self) -> &SchemaRef { + &self.file_schema + } + + /// number of files in the index + fn len(&self) -> usize { + self.index.num_rows() + } + + /// Return a [`PartitionedFile`] for the specified file offset + /// + /// For example, if the index batch contained data like + /// + /// ```text + /// fileA + /// fileB + /// fileC + /// ``` + /// + /// `get_file(1)` would return `(fileB, size)` + fn get_file(&self, file_offset: usize) -> (&str, u64) { + // Filenames and sizes are always non null, so we don't have to check is_valid + let file_name = self.file_names().value(file_offset); + let file_size = self.file_size().value(file_offset); + (file_name, file_size) + } + + /// Return the number of files that were pruned in the last query + pub fn last_num_pruned(&self) -> usize { + self.last_num_pruned.load(Ordering::SeqCst) + } + + /// Set the number of files that were pruned in the last query + fn set_last_num_pruned(&self, num_pruned: usize) { + self.last_num_pruned.store(num_pruned, Ordering::SeqCst); + } + + /// Return all the files matching the predicate + /// + /// Returns a tuple `(file_name, file_size)` + pub fn get_files( + &self, + predicate: Arc, + ) -> Result> { + // Use the PruningPredicate API to determine which files can not + // possibly have any relevant data. + let pruning_predicate = + PruningPredicate::try_new(predicate, self.schema().clone())?; + + // Now evaluate the pruning predicate into a boolean mask, one element per + // file in the index. If the mask is true, the file may have rows that + // match the predicate. If the mask is false, we know the file can not have *any* + // rows that match the predicate and thus can be skipped. + let file_mask = pruning_predicate.prune(self)?; + + let num_left = file_mask.iter().filter(|x| **x).count(); + self.set_last_num_pruned(self.len() - num_left); + + // Return only files that match the predicate from the index + let files_and_sizes: Vec<_> = file_mask + .into_iter() + .enumerate() + .filter_map(|(file, keep)| { + if keep { + Some(self.get_file(file)) + } else { + None + } + }) + .collect(); + Ok(files_and_sizes) + } + + /// Return the file_names column of this index + fn file_names(&self) -> &StringArray { + self.index + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + } + + /// Return the file_size column of this index + fn file_size(&self) -> &UInt64Array { + self.index + .column(1) + .as_any() + .downcast_ref::() + .unwrap() + } + + /// Reference to the row count column + fn row_counts_ref(&self) -> &ArrayRef { + self.index.column(2) + } + + /// Reference to the column minimum values + fn value_column_mins(&self) -> &ArrayRef { + self.index.column(3) + } + + /// Reference to the column maximum values + fn value_column_maxes(&self) -> &ArrayRef { + self.index.column(4) + } +} + +/// In order to use the PruningPredicate API, we need to provide DataFusion +/// the required statistics via the [`PruningStatistics`] trait +impl PruningStatistics for ParquetMetadataIndex { + /// return the minimum values for the value column + fn min_values(&self, column: &Column) -> Option { + if column.name.eq("value") { + Some(self.value_column_mins().clone()) + } else { + None + } + } + + /// return the maximum values for the value column + fn max_values(&self, column: &Column) -> Option { + if column.name.eq("value") { + Some(self.value_column_maxes().clone()) + } else { + None + } + } + + /// return the number of "containers". In this example, each "container" is + /// a file (aka a row in the index) + fn num_containers(&self) -> usize { + self.len() + } + + /// Return `None` to signal we don't have any information about null + /// counts in the index, + fn null_counts(&self, _column: &Column) -> Option { + None + } + + /// return the row counts for each file + fn row_counts(&self, _column: &Column) -> Option { + Some(self.row_counts_ref().clone()) + } + + /// The `contained` API can be used with structures such as Bloom filters, + /// but is not used in this example, so return `None` + fn contained( + &self, + _column: &Column, + _values: &HashSet, + ) -> Option { + None + } +} + +/// Builds a [`ParquetMetadataIndex`] from a set of parquet files +#[derive(Debug, Default)] +struct ParquetMetadataIndexBuilder { + file_schema: Option, + filenames: Vec, + file_sizes: Vec, + row_counts: Vec, + /// Holds the min/max value of the value column for each file + value_column_mins: Vec, + value_column_maxs: Vec, +} + +impl ParquetMetadataIndexBuilder { + fn new() -> Self { + Self::default() + } + + /// Add a new file to the index + fn add_file(&mut self, file: &Path) -> Result<()> { + let file_name = file + .file_name() + .ok_or_else(|| internal_datafusion_err!("No filename"))? + .to_str() + .ok_or_else(|| internal_datafusion_err!("Invalid filename"))?; + let file_size = file.metadata()?.len(); + + let file = File::open(file).map_err(|e| { + DataFusionError::from(e).context(format!("Error opening file {file:?}")) + })?; + + let reader = ParquetRecordBatchReaderBuilder::try_new(file)?; + + // Get the schema of the file. A real system might have to handle the + // case where the schema of the file is not the same as the schema of + // the other files e.g. using SchemaAdapter. + if self.file_schema.is_none() { + self.file_schema = Some(reader.schema().clone()); + } + + // extract the parquet statistics from the file's footer + let metadata = reader.metadata(); + + // Extract the min/max values for each row group from the statistics + let row_counts = StatisticsConverter::row_counts(reader.metadata())?; + let value_column_mins = StatisticsConverter::try_new( + "value", + RequestedStatistics::Min, + reader.schema(), + )? + .extract(reader.metadata())?; + let value_column_maxes = StatisticsConverter::try_new( + "value", + RequestedStatistics::Max, + reader.schema(), + )? + .extract(reader.metadata())?; + + // In a real system you would have to handle nulls, which represent + // unknown statistics. All statistics are known in this example + assert_eq!(row_counts.null_count(), 0); + assert_eq!(value_column_mins.null_count(), 0); + assert_eq!(value_column_maxes.null_count(), 0); + + // The statistics gathered above are for each row group. We need to + // aggregate them together to compute the overall file row count, + // min and max. + let row_count = row_counts + .iter() + .flatten() // skip nulls (should be none) + .sum::(); + let value_column_min = value_column_mins + .as_primitive::() + .iter() + .flatten() // skip nulls (i.e. min is unknown) + .min() + .unwrap_or_default(); + let value_column_max = value_column_maxes + .as_primitive::() + .iter() + .flatten() // skip nulls (i.e. max is unknown) + .max() + .unwrap_or_default(); + + // sanity check the statistics + assert_eq!(row_count, metadata.file_metadata().num_rows() as u64); + + self.add_row( + file_name, + file_size, + row_count, + value_column_min, + value_column_max, + ); + Ok(()) + } + + /// Add an entry for a single new file to the in progress index + fn add_row( + &mut self, + file_name: impl Into, + file_size: u64, + row_count: u64, + value_column_min: i32, + value_column_max: i32, + ) { + self.filenames.push(file_name.into()); + self.file_sizes.push(file_size); + self.row_counts.push(row_count); + self.value_column_mins.push(value_column_min); + self.value_column_maxs.push(value_column_max); + } + + /// Build the index from the files added + fn build(self) -> Result { + let Some(file_schema) = self.file_schema else { + return Err(internal_datafusion_err!("No files added to index")); + }; + + let file_name: ArrayRef = Arc::new(StringArray::from(self.filenames)); + let file_size: ArrayRef = Arc::new(UInt64Array::from(self.file_sizes)); + let row_count: ArrayRef = Arc::new(UInt64Array::from(self.row_counts)); + let value_column_min: ArrayRef = + Arc::new(Int32Array::from(self.value_column_mins)); + let value_column_max: ArrayRef = + Arc::new(Int32Array::from(self.value_column_maxs)); + + let index = RecordBatch::try_from_iter(vec![ + ("file_name", file_name), + ("file_size", file_size), + ("row_count", row_count), + ("value_column_min", value_column_min), + ("value_column_max", value_column_max), + ])?; + + Ok(ParquetMetadataIndex { + file_schema, + index, + last_num_pruned: AtomicUsize::new(0), + }) + } +} + +/// Return a list of the directory entries in the given directory, sorted by name +fn read_dir(dir: &Path) -> Result> { + let mut files = dir + .read_dir() + .map_err(|e| { + DataFusionError::from(e).context(format!("Error reading directory {dir:?}")) + })? + .map(|entry| { + entry.map_err(|e| { + DataFusionError::from(e) + .context(format!("Error reading directory entry in {dir:?}")) + }) + }) + .collect::>>()?; + files.sort_by_key(|entry| entry.file_name()); + Ok(files) +} + +/// Demonstration Data +/// +/// Makes a directory with three parquet files +/// +/// The schema of the files is +/// * file_name (string) +/// * value (int32) +/// +/// The files are as follows: +/// * file1.parquet (values 0..100) +/// * file2.parquet (values 100..200) +/// * file3.parquet (values 200..3000) +struct DemoData { + tmpdir: TempDir, +} + +impl DemoData { + fn try_new() -> Result { + let tmpdir = TempDir::new()?; + make_demo_file(tmpdir.path().join("file1.parquet"), 0..100)?; + make_demo_file(tmpdir.path().join("file2.parquet"), 100..200)?; + make_demo_file(tmpdir.path().join("file3.parquet"), 200..3000)?; + + Ok(Self { tmpdir }) + } + + fn path(&self) -> PathBuf { + self.tmpdir.path().into() + } +} + +/// Creates a new parquet file at the specified path. +/// +/// The `value` column increases sequentially from `min_value` to `max_value` +/// with the following schema: +/// +/// * file_name: Utf8 +/// * value: Int32 +fn make_demo_file(path: impl AsRef, value_range: Range) -> Result<()> { + let path = path.as_ref(); + let file = File::create(path)?; + let filename = path + .file_name() + .ok_or_else(|| internal_datafusion_err!("No filename"))? + .to_str() + .ok_or_else(|| internal_datafusion_err!("Invalid filename"))?; + + let num_values = value_range.len(); + let file_names = + StringArray::from_iter_values(std::iter::repeat(&filename).take(num_values)); + let values = Int32Array::from_iter_values(value_range); + let batch = RecordBatch::try_from_iter(vec![ + ("file_name", Arc::new(file_names) as ArrayRef), + ("value", Arc::new(values) as ArrayRef), + ])?; + + let schema = batch.schema(); + + // write the actual values to the file + let props = None; + let mut writer = ArrowWriter::try_new(file, schema, props)?; + writer.write(&batch)?; + writer.finish()?; + + Ok(()) +} From 1a4655e64d5e09b55be674e5fc788160eab72e76 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 24 May 2024 05:46:08 -0400 Subject: [PATCH 8/8] Simplify `parquet_index` example --- datafusion-examples/examples/parquet_index.rs | 56 ++++++------------- 1 file changed, 17 insertions(+), 39 deletions(-) diff --git a/datafusion-examples/examples/parquet_index.rs b/datafusion-examples/examples/parquet_index.rs index 0ed337611389..a365a5843f19 100644 --- a/datafusion-examples/examples/parquet_index.rs +++ b/datafusion-examples/examples/parquet_index.rs @@ -36,9 +36,8 @@ use datafusion::parquet::arrow::ArrowWriter; use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::*; -use datafusion_common::config::TableParquetOptions; use datafusion_common::{ - internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue, Statistics, + internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue, }; use datafusion_expr::utils::conjunction; use datafusion_expr::{TableProviderFilterPushDown, TableType}; @@ -245,45 +244,24 @@ impl TableProvider for IndexTableProvider { // Transform to the format needed to pass to ParquetExec // Create one file group per file (default to scanning them all in parallel) - let file_groups = files - .into_iter() - .map(|(file_name, file_size)| { - let path = self.dir.join(file_name); - let canonical_path = fs::canonicalize(path)?; - Ok(vec![PartitionedFile::new( - canonical_path.display().to_string(), - file_size, - )]) - }) - .collect::>>()?; - - // for now, simply use ParquetExec - // TODO make a builder for FileScanConfig let object_store_url = ObjectStoreUrl::parse("file://")?; - let base_config = FileScanConfig { - object_store_url, - file_schema: self.schema(), - file_groups, - statistics: Statistics::new_unknown(self.index.schema()), - projection: projection.cloned(), - limit, - table_partition_cols: vec![], - output_ordering: vec![], - }; - - let metadata_size_hint = None; - - let table_parquet_options = TableParquetOptions::default(); - - // TODO make a builder for parquet exec - let exec = ParquetExec::new( - base_config, - Some(predicate), - metadata_size_hint, - table_parquet_options, - ); + let mut base_config = FileScanConfig::new(object_store_url, self.schema()) + .with_projection(projection.cloned()) + .with_limit(limit); + + for (file_name, file_size) in files { + let path = self.dir.join(file_name); + let canonical_path = fs::canonicalize(path)?; + base_config = base_config.with_file(PartitionedFile::new( + canonical_path.display().to_string(), + file_size, + )); + } - Ok(Arc::new(exec)) + // Create the actual ParquetExec + Ok(ParquetExec::builder(base_config) + .with_predicate(predicate) + .build_arc()) } /// Tell DataFusion to push filters down to the scan method