Skip to content

Commit

Permalink
Coerce BinaryView/Utf8View to LargeBinary/LargeUtf8 on output. (#12271)
Browse files Browse the repository at this point in the history
* chore(12119): add expand_views_at_output optimizer config option, default to false

* feat(12119): coerce output based upon expand_views_at_output

* test(12119): test coercion of view types, to non-view types, on output.

* chore(12119): config.md for new config option

* refactor(12119): code cleanup

* test(12119): more clearly define scenarios in viewtype-coercion test coverage, and expand with PlanD scenario
  • Loading branch information
wiedld authored Sep 9, 2024
1 parent 6316849 commit 391f5cb
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 7 deletions.
5 changes: 5 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,11 @@ config_namespace! {

/// When set to true, the optimizer will not attempt to convert Union to Interleave
pub prefer_existing_union: bool, default = false

/// When set to true, if the returned type is a view type
/// then the output will be coerced to a non-view.
/// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
pub expand_views_at_output: bool, default = false
}
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1393,7 +1393,7 @@ impl LogicalPlan {
/// referenced expressions into columns.
///
/// See also: [`crate::utils::columnize_expr`]
pub(crate) fn columnized_output_exprs(&self) -> Result<Vec<(&Expr, Column)>> {
pub fn columnized_output_exprs(&self) -> Result<Vec<(&Expr, Column)>> {
match self {
LogicalPlan::Aggregate(aggregate) => Ok(aggregate
.output_expressions()?
Expand Down
231 changes: 225 additions & 6 deletions datafusion/optimizer/src/analyzer/type_coercion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ use std::sync::Arc;

use itertools::izip;

use arrow::datatypes::{DataType, Field, IntervalUnit};
use arrow::datatypes::{DataType, Field, IntervalUnit, Schema};

use crate::analyzer::AnalyzerRule;
use crate::utils::NamePreserver;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
use datafusion_common::{
exec_err, internal_err, not_impl_err, plan_datafusion_err, plan_err, Column,
DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue,
DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue, TableReference,
};
use datafusion_expr::expr::{
self, Alias, Between, BinaryExpr, Case, Exists, InList, InSubquery, Like,
Expand Down Expand Up @@ -66,19 +66,39 @@ impl TypeCoercion {
}
}

/// Coerce output schema based upon optimizer config.
fn coerce_output(plan: LogicalPlan, config: &ConfigOptions) -> Result<LogicalPlan> {
if !config.optimizer.expand_views_at_output {
return Ok(plan);
}

let outer_refs = plan.expressions();
if outer_refs.is_empty() {
return Ok(plan);
}

if let Some(dfschema) = transform_schema_to_nonview(plan.schema()) {
coerce_plan_expr_for_schema(plan, &dfschema?)
} else {
Ok(plan)
}
}

impl AnalyzerRule for TypeCoercion {
fn name(&self) -> &str {
"type_coercion"
}

fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> Result<LogicalPlan> {
fn analyze(&self, plan: LogicalPlan, config: &ConfigOptions) -> Result<LogicalPlan> {
let empty_schema = DFSchema::empty();

// recurse
let transformed_plan = plan
.transform_up_with_subqueries(|plan| analyze_internal(&empty_schema, plan))?
.data;

Ok(transformed_plan)
// finish
coerce_output(transformed_plan, config)
}
}

Expand Down Expand Up @@ -514,6 +534,55 @@ impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> {
}
}

/// Transform a schema to use non-view types for Utf8View and BinaryView
fn transform_schema_to_nonview(dfschema: &DFSchemaRef) -> Option<Result<DFSchema>> {
let metadata = dfschema.as_arrow().metadata.clone();
let mut transformed = false;

let (qualifiers, transformed_fields): (Vec<Option<TableReference>>, Vec<Arc<Field>>) =
dfschema
.iter()
.map(|(qualifier, field)| match field.data_type() {
DataType::Utf8View => {
transformed = true;
(
qualifier.cloned() as Option<TableReference>,
Arc::new(Field::new(
field.name(),
DataType::LargeUtf8,
field.is_nullable(),
)),
)
}
DataType::BinaryView => {
transformed = true;
(
qualifier.cloned() as Option<TableReference>,
Arc::new(Field::new(
field.name(),
DataType::LargeBinary,
field.is_nullable(),
)),
)
}
_ => (
qualifier.cloned() as Option<TableReference>,
Arc::clone(field),
),
})
.unzip();

if !transformed {
return None;
}

let schema = Schema::new_with_metadata(transformed_fields, metadata);
Some(DFSchema::from_field_specific_qualified_schema(
qualifiers,
&Arc::new(schema),
))
}

/// Casts the given `value` to `target_type`. Note that this function
/// only considers `Null` or `Utf8` values.
fn coerce_scalar(target_type: &DataType, value: &ScalarValue) -> Result<ScalarValue> {
Expand Down Expand Up @@ -935,10 +1004,11 @@ mod test {
use arrow::datatypes::DataType::Utf8;
use arrow::datatypes::{DataType, Field, TimeUnit};

use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{TransformedResult, TreeNode};
use datafusion_common::{DFSchema, DFSchemaRef, Result, ScalarValue};
use datafusion_expr::expr::{self, InSubquery, Like, ScalarFunction};
use datafusion_expr::logical_plan::{EmptyRelation, Projection};
use datafusion_expr::logical_plan::{EmptyRelation, Projection, Sort};
use datafusion_expr::test::function_stub::avg_udaf;
use datafusion_expr::{
cast, col, create_udaf, is_true, lit, AccumulatorFactoryFunction, AggregateUDF,
Expand All @@ -951,7 +1021,7 @@ mod test {
use crate::analyzer::type_coercion::{
coerce_case_expression, TypeCoercion, TypeCoercionRewriter,
};
use crate::test::assert_analyzed_plan_eq;
use crate::test::{assert_analyzed_plan_eq, assert_analyzed_plan_with_config_eq};

fn empty() -> Arc<LogicalPlan> {
Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
Expand Down Expand Up @@ -982,6 +1052,155 @@ mod test {
assert_analyzed_plan_eq(Arc::new(TypeCoercion::new()), plan, expected)
}

fn coerce_on_output_if_viewtype(plan: LogicalPlan, expected: &str) -> Result<()> {
let mut options = ConfigOptions::default();
options.optimizer.expand_views_at_output = true;

assert_analyzed_plan_with_config_eq(
options,
Arc::new(TypeCoercion::new()),
plan.clone(),
expected,
)
}

fn do_not_coerce_on_output(plan: LogicalPlan, expected: &str) -> Result<()> {
assert_analyzed_plan_with_config_eq(
ConfigOptions::default(),
Arc::new(TypeCoercion::new()),
plan.clone(),
expected,
)
}

#[test]
fn coerce_utf8view_output() -> Result<()> {
// Plan A
// scenario: outermost utf8view projection
let expr = col("a");
let empty = empty_with_type(DataType::Utf8View);
let plan = LogicalPlan::Projection(Projection::try_new(
vec![expr.clone()],
Arc::clone(&empty),
)?);
// Plan A: no coerce
let if_not_coerced = "Projection: a\n EmptyRelation";
do_not_coerce_on_output(plan.clone(), if_not_coerced)?;
// Plan A: coerce requested: Utf8View => LargeUtf8
let if_coerced = "Projection: CAST(a AS LargeUtf8)\n EmptyRelation";
coerce_on_output_if_viewtype(plan.clone(), if_coerced)?;

// Plan B
// scenario: outermost bool projection
let bool_expr = col("a").lt(lit("foo"));
let bool_plan = LogicalPlan::Projection(Projection::try_new(
vec![bool_expr],
Arc::clone(&empty),
)?);
// Plan B: no coerce
let if_not_coerced =
"Projection: a < CAST(Utf8(\"foo\") AS Utf8View)\n EmptyRelation";
do_not_coerce_on_output(bool_plan.clone(), if_not_coerced)?;
// Plan B: coerce requested: no coercion applied
let if_coerced = if_not_coerced;
coerce_on_output_if_viewtype(bool_plan, if_coerced)?;

// Plan C
// scenario: with a non-projection root logical plan node
let sort_expr = expr.sort(true, true);
let sort_plan = LogicalPlan::Sort(Sort {
expr: vec![sort_expr],
input: Arc::new(plan),
fetch: None,
});
// Plan C: no coerce
let if_not_coerced =
"Sort: a ASC NULLS FIRST\n Projection: a\n EmptyRelation";
do_not_coerce_on_output(sort_plan.clone(), if_not_coerced)?;
// Plan C: coerce requested: Utf8View => LargeUtf8
let if_coerced = "Projection: CAST(a AS LargeUtf8)\n Sort: a ASC NULLS FIRST\n Projection: a\n EmptyRelation";
coerce_on_output_if_viewtype(sort_plan.clone(), if_coerced)?;

// Plan D
// scenario: two layers of projections with view types
let plan = LogicalPlan::Projection(Projection::try_new(
vec![col("a")],
Arc::new(sort_plan),
)?);
// Plan D: no coerce
let if_not_coerced = "Projection: a\n Sort: a ASC NULLS FIRST\n Projection: a\n EmptyRelation";
do_not_coerce_on_output(plan.clone(), if_not_coerced)?;
// Plan B: coerce requested: Utf8View => LargeUtf8 only on outermost
let if_coerced = "Projection: CAST(a AS LargeUtf8)\n Sort: a ASC NULLS FIRST\n Projection: a\n EmptyRelation";
coerce_on_output_if_viewtype(plan.clone(), if_coerced)?;

Ok(())
}

#[test]
fn coerce_binaryview_output() -> Result<()> {
// Plan A
// scenario: outermost binaryview projection
let expr = col("a");
let empty = empty_with_type(DataType::BinaryView);
let plan = LogicalPlan::Projection(Projection::try_new(
vec![expr.clone()],
Arc::clone(&empty),
)?);
// Plan A: no coerce
let if_not_coerced = "Projection: a\n EmptyRelation";
do_not_coerce_on_output(plan.clone(), if_not_coerced)?;
// Plan A: coerce requested: BinaryView => LargeBinary
let if_coerced = "Projection: CAST(a AS LargeBinary)\n EmptyRelation";
coerce_on_output_if_viewtype(plan.clone(), if_coerced)?;

// Plan B
// scenario: outermost bool projection
let bool_expr = col("a").lt(lit(vec![8, 1, 8, 1]));
let bool_plan = LogicalPlan::Projection(Projection::try_new(
vec![bool_expr],
Arc::clone(&empty),
)?);
// Plan B: no coerce
let if_not_coerced =
"Projection: a < CAST(Binary(\"8,1,8,1\") AS BinaryView)\n EmptyRelation";
do_not_coerce_on_output(bool_plan.clone(), if_not_coerced)?;
// Plan B: coerce requested: no coercion applied
let if_coerced = if_not_coerced;
coerce_on_output_if_viewtype(bool_plan, if_coerced)?;

// Plan C
// scenario: with a non-projection root logical plan node
let sort_expr = expr.sort(true, true);
let sort_plan = LogicalPlan::Sort(Sort {
expr: vec![sort_expr],
input: Arc::new(plan),
fetch: None,
});
// Plan C: no coerce
let if_not_coerced =
"Sort: a ASC NULLS FIRST\n Projection: a\n EmptyRelation";
do_not_coerce_on_output(sort_plan.clone(), if_not_coerced)?;
// Plan C: coerce requested: BinaryView => LargeBinary
let if_coerced = "Projection: CAST(a AS LargeBinary)\n Sort: a ASC NULLS FIRST\n Projection: a\n EmptyRelation";
coerce_on_output_if_viewtype(sort_plan.clone(), if_coerced)?;

// Plan D
// scenario: two layers of projections with view types
let plan = LogicalPlan::Projection(Projection::try_new(
vec![col("a")],
Arc::new(sort_plan),
)?);
// Plan D: no coerce
let if_not_coerced = "Projection: a\n Sort: a ASC NULLS FIRST\n Projection: a\n EmptyRelation";
do_not_coerce_on_output(plan.clone(), if_not_coerced)?;
// Plan B: coerce requested: BinaryView => LargeBinary only on outermost
let if_coerced = "Projection: CAST(a AS LargeBinary)\n Sort: a ASC NULLS FIRST\n Projection: a\n EmptyRelation";
coerce_on_output_if_viewtype(plan.clone(), if_coerced)?;

Ok(())
}

#[test]
fn nested_case() -> Result<()> {
let expr = col("a").lt(lit(2_u32));
Expand Down
11 changes: 11 additions & 0 deletions datafusion/optimizer/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,17 @@ pub fn assert_analyzed_plan_eq(
expected: &str,
) -> Result<()> {
let options = ConfigOptions::default();
assert_analyzed_plan_with_config_eq(options, rule, plan, expected)?;

Ok(())
}

pub fn assert_analyzed_plan_with_config_eq(
options: ConfigOptions,
rule: Arc<dyn AnalyzerRule + Send + Sync>,
plan: LogicalPlan,
expected: &str,
) -> Result<()> {
let analyzed_plan =
Analyzer::with_rules(vec![rule]).execute_and_check(plan, &options, |_, _| {})?;
let formatted_plan = format!("{analyzed_plan}");
Expand Down
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ datafusion.optimizer.default_filter_selectivity 20
datafusion.optimizer.enable_distinct_aggregation_soft_limit true
datafusion.optimizer.enable_round_robin_repartition true
datafusion.optimizer.enable_topk_aggregation true
datafusion.optimizer.expand_views_at_output false
datafusion.optimizer.filter_null_join_keys false
datafusion.optimizer.hash_join_single_partition_threshold 1048576
datafusion.optimizer.hash_join_single_partition_threshold_rows 131072
Expand Down Expand Up @@ -314,6 +315,7 @@ datafusion.optimizer.default_filter_selectivity 20 The default filter selectivit
datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read.
datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores
datafusion.optimizer.enable_topk_aggregation true When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible
datafusion.optimizer.expand_views_at_output false When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
datafusion.optimizer.filter_null_join_keys false When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down.
datafusion.optimizer.hash_join_single_partition_threshold 1048576 The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition
datafusion.optimizer.hash_join_single_partition_threshold_rows 131072 The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus
| datafusion.optimizer.hash_join_single_partition_threshold_rows | 131072 | The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition |
| datafusion.optimizer.default_filter_selectivity | 20 | The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). |
| datafusion.optimizer.prefer_existing_union | false | When set to true, the optimizer will not attempt to convert Union to Interleave |
| datafusion.optimizer.expand_views_at_output | false | When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. |
| datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans |
| datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans |
| datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans |
Expand Down

0 comments on commit 391f5cb

Please sign in to comment.