Skip to content

Commit

Permalink
Deprecate ParquetExec::new and update code
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed May 24, 2024
1 parent 09f14c2 commit 313252e
Show file tree
Hide file tree
Showing 14 changed files with 93 additions and 120 deletions.
23 changes: 14 additions & 9 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ use crate::arrow::array::{
use crate::arrow::datatypes::{DataType, Fields, Schema, SchemaRef};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::physical_plan::{
DefaultSchemaAdapterFactory, FileGroupDisplay, FileSinkConfig, ParquetExec,
SchemaAdapterFactory,
DefaultSchemaAdapterFactory, FileGroupDisplay, FileSinkConfig, SchemaAdapterFactory,
};
use crate::datasource::statistics::{create_max_min_accs, get_col_stats};
use crate::error::Result;
Expand Down Expand Up @@ -75,6 +74,7 @@ use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::task::JoinSet;

use crate::datasource::physical_plan::parquet::ParquetExecBuilder;
use futures::{StreamExt, TryStreamExt};
use hashbrown::HashMap;
use object_store::path::Path;
Expand Down Expand Up @@ -253,17 +253,22 @@ impl FileFormat for ParquetFormat {
conf: FileScanConfig,
filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut builder =
ParquetExecBuilder::new_with_options(conf, self.options.clone());

// If enable pruning then combine the filters to build the predicate.
// If disable pruning then set the predicate to None, thus readers
// will not prune data based on the statistics.
let predicate = self.enable_pruning().then(|| filters.cloned()).flatten();
if self.enable_pruning() {
if let Some(predicate) = filters.cloned() {
builder = builder.with_predicate(predicate);
}
}
if let Some(metadata_size_hint) = self.metadata_size_hint() {
builder = builder.with_metadata_size_hint(metadata_size_hint);
}

Ok(Arc::new(ParquetExec::new(
conf,
predicate,
self.metadata_size_hint(),
self.options.clone(),
)))
Ok(builder.build_arc())
}

async fn create_writer_physical_plan(
Expand Down
44 changes: 24 additions & 20 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,11 @@ impl ParquetExecBuilder {
self
}

/// Convenience: build an `Arc`d `ParquetExec` from this builder
pub fn build_arc(self) -> Arc<ParquetExec> {
Arc::new(self.build())
}

/// Build a [`ParquetExec`]
pub fn build(self) -> ParquetExec {
let Self {
Expand Down Expand Up @@ -355,7 +360,10 @@ impl ParquetExecBuilder {

impl ParquetExec {
/// Create a new Parquet reader execution plan provided file list and schema.
//#[deprecated(since = "39.0.0", note = "use builder instead")]
#[deprecated(
since = "39.0.0",
note = "use `ParquetExec::builder` or `ParquetExecBuilder`"
)]
pub fn new(
base_config: FileScanConfig,
predicate: Option<Arc<dyn PhysicalExpr>>,
Expand Down Expand Up @@ -1157,15 +1165,17 @@ mod tests {
let predicate = predicate.map(|p| logical2physical(&p, &file_schema));

// prepare the scan
let mut parquet_exec = ParquetExec::new(
let mut builder = ParquetExec::builder(
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema)
.with_file_group(file_group)
.with_projection(projection),
predicate,
None,
Default::default(),
);

if let Some(predicate) = predicate {
builder = builder.with_predicate(predicate);
}
let mut parquet_exec = builder.build();

if pushdown_predicate {
parquet_exec = parquet_exec
.with_pushdown_filters(true)
Expand Down Expand Up @@ -1808,13 +1818,11 @@ mod tests {
expected_row_num: Option<usize>,
file_schema: SchemaRef,
) -> Result<()> {
let parquet_exec = ParquetExec::new(
let parquet_exec = ParquetExec::builder(
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema)
.with_file_groups(file_groups),
None,
None,
Default::default(),
);
)
.build();
assert_eq!(
parquet_exec
.properties()
Expand Down Expand Up @@ -1910,7 +1918,7 @@ mod tests {
),
]);

let parquet_exec = ParquetExec::new(
let parquet_exec = ParquetExec::builder(
FileScanConfig::new(object_store_url, schema.clone())
.with_file(partitioned_file)
// file has 10 cols so index 12 should be month and 13 should be day
Expand All @@ -1927,10 +1935,8 @@ mod tests {
false,
),
]),
None,
None,
Default::default(),
);
)
.build();
assert_eq!(
parquet_exec.cache.output_partitioning().partition_count(),
1
Expand Down Expand Up @@ -1985,13 +1991,11 @@ mod tests {
};

let file_schema = Arc::new(Schema::empty());
let parquet_exec = ParquetExec::new(
let parquet_exec = ParquetExec::builder(
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema)
.with_file(partitioned_file),
None,
None,
Default::default(),
);
)
.build();

let mut results = parquet_exec.execute(0, state.task_ctx())?;
let batch = results.next().await.unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,16 +245,14 @@ mod tests {
}

fn parquet_exec(schema: &SchemaRef) -> Arc<ParquetExec> {
Arc::new(ParquetExec::new(
ParquetExec::builder(
FileScanConfig::new(
ObjectStoreUrl::parse("test:///").unwrap(),
schema.clone(),
)
.with_file(PartitionedFile::new("x".to_string(), 100)),
None,
None,
Default::default(),
))
)
.build_arc()
}

fn partial_aggregate_exec(
Expand Down
16 changes: 6 additions & 10 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1431,14 +1431,12 @@ pub(crate) mod tests {
pub(crate) fn parquet_exec_with_sort(
output_ordering: Vec<Vec<PhysicalSortExpr>>,
) -> Arc<ParquetExec> {
Arc::new(ParquetExec::new(
ParquetExec::builder(
FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema())
.with_file(PartitionedFile::new("x".to_string(), 100))
.with_output_ordering(output_ordering),
None,
None,
Default::default(),
))
)
.build_arc()
}

fn parquet_exec_multiple() -> Arc<ParquetExec> {
Expand All @@ -1449,17 +1447,15 @@ pub(crate) mod tests {
fn parquet_exec_multiple_sorted(
output_ordering: Vec<Vec<PhysicalSortExpr>>,
) -> Arc<ParquetExec> {
Arc::new(ParquetExec::new(
ParquetExec::builder(
FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema())
.with_file_groups(vec![
vec![PartitionedFile::new("x".to_string(), 100)],
vec![PartitionedFile::new("y".to_string(), 100)],
])
.with_output_ordering(output_ordering),
None,
None,
Default::default(),
))
)
.build_arc()
}

fn csv_exec() -> Arc<CsvExec> {
Expand Down
16 changes: 6 additions & 10 deletions datafusion/core/src/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,13 +274,11 @@ pub fn sort_preserving_merge_exec(

/// Create a non sorted parquet exec
pub fn parquet_exec(schema: &SchemaRef) -> Arc<ParquetExec> {
Arc::new(ParquetExec::new(
ParquetExec::builder(
FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema.clone())
.with_file(PartitionedFile::new("x".to_string(), 100)),
None,
None,
Default::default(),
))
)
.build_arc()
}

// Created a sorted parquet exec
Expand All @@ -290,14 +288,12 @@ pub fn parquet_exec_sorted(
) -> Arc<dyn ExecutionPlan> {
let sort_exprs = sort_exprs.into_iter().collect();

Arc::new(ParquetExec::new(
ParquetExec::builder(
FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema.clone())
.with_file(PartitionedFile::new("x".to_string(), 100))
.with_output_ordering(vec![sort_exprs]),
None,
None,
Default::default(),
))
)
.build_arc()
}

pub fn union_exec(input: Vec<Arc<dyn ExecutionPlan>>) -> Arc<dyn ExecutionPlan> {
Expand Down
22 changes: 10 additions & 12 deletions datafusion/core/src/test_util/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::physical_plan::metrics::MetricsSet;
use crate::physical_plan::ExecutionPlan;
use crate::prelude::{Expr, SessionConfig, SessionContext};

use crate::datasource::physical_plan::parquet::ParquetExecBuilder;
use object_store::path::Path;
use object_store::ObjectMeta;
use parquet::arrow::ArrowWriter;
Expand Down Expand Up @@ -163,22 +164,19 @@ impl TestParquetFile {
let filter = simplifier.coerce(filter, &df_schema).unwrap();
let physical_filter_expr =
create_physical_expr(&filter, &df_schema, &ExecutionProps::default())?;
let parquet_exec = Arc::new(ParquetExec::new(
scan_config,
Some(physical_filter_expr.clone()),
None,
parquet_options,
));

let parquet_exec =
ParquetExecBuilder::new_with_options(scan_config, parquet_options)
.with_predicate(physical_filter_expr.clone())
.build_arc();

let exec = Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?);
Ok(exec)
} else {
Ok(Arc::new(ParquetExec::new(
scan_config,
None,
None,
parquet_options,
)))
Ok(
ParquetExecBuilder::new_with_options(scan_config, parquet_options)
.build_arc(),
)
}
}

Expand Down
6 changes: 2 additions & 4 deletions datafusion/core/tests/parquet/custom_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,15 @@ async fn route_data_access_ops_to_parquet_file_reader_factory() {
.collect();

// prepare the scan
let parquet_exec = ParquetExec::new(
let parquet_exec = ParquetExec::builder(
FileScanConfig::new(
// just any url that doesn't point to in memory object store
ObjectStoreUrl::local_filesystem(),
file_schema,
)
.with_file_group(file_group),
None,
None,
Default::default(),
)
.build()
.with_parquet_file_reader_factory(Arc::new(InMemoryParquetFileReaderFactory(
Arc::clone(&in_memory_object_store),
)));
Expand Down
11 changes: 5 additions & 6 deletions datafusion/core/tests/parquet/page_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,12 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr) -> ParquetExec {
let execution_props = ExecutionProps::new();
let predicate = create_physical_expr(&filter, &df_schema, &execution_props).unwrap();

let parquet_exec = ParquetExec::new(
ParquetExec::builder(
FileScanConfig::new(object_store_url, schema).with_file(partitioned_file),
Some(predicate),
None,
Default::default(),
);
parquet_exec.with_enable_page_index(true)
)
.with_predicate(predicate)
.build()
.with_enable_page_index(true)
}

#[tokio::test]
Expand Down
6 changes: 2 additions & 4 deletions datafusion/core/tests/parquet/schema_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,11 @@ async fn can_override_schema_adapter() {
let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()]));

// prepare the scan
let parquet_exec = ParquetExec::new(
let parquet_exec = ParquetExec::builder(
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema)
.with_file(partitioned_file),
None,
None,
Default::default(),
)
.build()
.with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {}));

let session_ctx = SessionContext::new();
Expand Down
16 changes: 6 additions & 10 deletions datafusion/core/tests/parquet/schema_coercion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,11 @@ async fn multi_parquet_coercion() {
Field::new("c2", DataType::Int32, true),
Field::new("c3", DataType::Float64, true),
]));
let parquet_exec = ParquetExec::new(
let parquet_exec = ParquetExec::builder(
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema)
.with_file_group(file_group),
None,
None,
Default::default(),
);
)
.build();

let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
Expand Down Expand Up @@ -115,14 +113,12 @@ async fn multi_parquet_coercion_projection() {
Field::new("c2", DataType::Int32, true),
Field::new("c3", DataType::Float64, true),
]));
let parquet_exec = ParquetExec::new(
let parquet_exec = ParquetExec::builder(
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema)
.with_file_group(file_group)
.with_projection(Some(vec![1, 0, 2])),
None,
None,
Default::default(),
);
)
.build();

let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
Expand Down
11 changes: 5 additions & 6 deletions datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,12 +225,11 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
)
})
.transpose()?;
Ok(Arc::new(ParquetExec::new(
base_config,
predicate,
None,
Default::default(),
)))
let mut builder = ParquetExec::builder(base_config);
if let Some(predicate) = predicate {
builder = builder.with_predicate(predicate)
}
Ok(builder.build_arc())
}
PhysicalPlanType::AvroScan(scan) => {
Ok(Arc::new(AvroExec::new(parse_protobuf_file_scan_config(
Expand Down
Loading

0 comments on commit 313252e

Please sign in to comment.