Skip to content

Add Extension Type / Metadata support for Scalar UDFs #15646

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Apr 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
39d1a72
Add in plumbing to pass around metadata for physical expressions
timsaucer Apr 8, 2025
9bcd4b5
Adding argument metadata to scalar argument struct
timsaucer Apr 8, 2025
ea561b0
Since everywhere we use this we immediately clone, go ahead and retur…
timsaucer Apr 8, 2025
8daa356
Cargo fmt
timsaucer Apr 8, 2025
a2d5f9e
Benchmarks required args_metadata in tests
timsaucer Apr 8, 2025
a3514de
Clippy warnings
timsaucer Apr 8, 2025
4e3b7bc
Switching over to passing Field around instead of metadata so we can …
timsaucer Apr 9, 2025
281a83e
Switching return_type_from_args to return_field_from_args
timsaucer Apr 15, 2025
03ddfe7
Updates to unit tests for switching to field instead of data_type
timsaucer Apr 15, 2025
58933df
Resolve unit test issues
timsaucer Apr 15, 2025
6924e4e
Update after rebase on main
timsaucer Apr 16, 2025
68f4356
GetFieldFunc should return the field it finds instead of creating a n…
timsaucer Apr 16, 2025
d6af7e3
Get metadata from scalar functions
timsaucer Apr 17, 2025
07b7ec8
Change expr_schema to use to_field primarily instead of individual ca…
timsaucer Apr 17, 2025
caad021
Scalar function arguments should take return field instead of return …
timsaucer Apr 18, 2025
9aa5227
subquery should just get the field from below and not lose potential …
timsaucer Apr 18, 2025
2fab67b
Update comment
timsaucer Apr 18, 2025
02d6f47
Remove output_field now that we've determined it using return_field_f…
timsaucer Apr 21, 2025
871c382
Change name to_field to field_from_column to be more consistent with …
timsaucer Apr 21, 2025
c439224
Minor moving around of the explicit lifetimes in the struct definition
timsaucer Apr 21, 2025
d69deee
Change physical expression to require to output a field which require…
timsaucer Apr 21, 2025
317ed22
Change name from output_field to return_field to be more consistent
timsaucer Apr 21, 2025
6481b3e
Update migration guide for DF48 with user defined functions
timsaucer Apr 21, 2025
ef6c58c
Merge branch 'main' into feat/physical_expr_evaluate_metadata
timsaucer Apr 21, 2025
cddb52a
Whitespace
timsaucer Apr 21, 2025
e4d5846
Docstring correction
timsaucer Apr 21, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 26 additions & 28 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ impl DFSchema {
let matches = self.qualified_fields_with_unqualified_name(name);
match matches.len() {
0 => Err(unqualified_field_not_found(name, self)),
1 => Ok((matches[0].0, (matches[0].1))),
1 => Ok((matches[0].0, matches[0].1)),
_ => {
// When `matches` size > 1, it doesn't necessarily mean an `ambiguous name` problem.
// Because name may generate from Alias/... . It means that it don't own qualifier.
Expand Down Expand Up @@ -515,14 +515,6 @@ impl DFSchema {
Ok(self.field(idx))
}

/// Find the field with the given qualified column
pub fn field_from_column(&self, column: &Column) -> Result<&Field> {
match &column.relation {
Some(r) => self.field_with_qualified_name(r, &column.name),
None => self.field_with_unqualified_name(&column.name),
}
}

/// Find the field with the given qualified column
pub fn qualified_field_from_column(
&self,
Expand Down Expand Up @@ -969,16 +961,28 @@ impl Display for DFSchema {
/// widely used in the DataFusion codebase.
pub trait ExprSchema: std::fmt::Debug {
/// Is this column reference nullable?
fn nullable(&self, col: &Column) -> Result<bool>;
fn nullable(&self, col: &Column) -> Result<bool> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we could (perhaps as a follow on ticket) deprecate all the other methods on ExprSchema as to_field supercedes all of them. It would make a good first issue ticket

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok(self.field_from_column(col)?.is_nullable())
}

/// What is the datatype of this column?
fn data_type(&self, col: &Column) -> Result<&DataType>;
fn data_type(&self, col: &Column) -> Result<&DataType> {
Ok(self.field_from_column(col)?.data_type())
}

/// Returns the column's optional metadata.
fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>>;
fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
Ok(self.field_from_column(col)?.metadata())
}

/// Return the column's datatype and nullability
fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)>;
fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
let field = self.field_from_column(col)?;
Ok((field.data_type(), field.is_nullable()))
}

// Return the column's field
fn field_from_column(&self, col: &Column) -> Result<&Field>;
}

// Implement `ExprSchema` for `Arc<DFSchema>`
Expand All @@ -998,24 +1002,18 @@ impl<P: AsRef<DFSchema> + std::fmt::Debug> ExprSchema for P {
fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
self.as_ref().data_type_and_nullable(col)
}
}

impl ExprSchema for DFSchema {
fn nullable(&self, col: &Column) -> Result<bool> {
Ok(self.field_from_column(col)?.is_nullable())
}

fn data_type(&self, col: &Column) -> Result<&DataType> {
Ok(self.field_from_column(col)?.data_type())
}

fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
Ok(self.field_from_column(col)?.metadata())
fn field_from_column(&self, col: &Column) -> Result<&Field> {
self.as_ref().field_from_column(col)
}
}

fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
let field = self.field_from_column(col)?;
Ok((field.data_type(), field.is_nullable()))
impl ExprSchema for DFSchema {
fn field_from_column(&self, col: &Column) -> Result<&Field> {
match &col.relation {
Some(r) => self.field_with_qualified_name(r, &col.name),
None => self.field_with_unqualified_name(&col.name),
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ extended_tests = []
[dependencies]
arrow = { workspace = true }
arrow-ipc = { workspace = true }
arrow-schema = { workspace = true }
arrow-schema = { workspace = true, features = ["canonical_extension_types"] }
async-trait = { workspace = true }
bytes = { workspace = true }
bzip2 = { version = "0.5.2", optional = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ fn test_update_matching_exprs() -> Result<()> {
Arc::new(Column::new("b", 1)),
)),
],
DataType::Int32,
Field::new("f", DataType::Int32, true),
)),
Arc::new(CaseExpr::try_new(
Some(Arc::new(Column::new("d", 2))),
Expand Down Expand Up @@ -193,7 +193,7 @@ fn test_update_matching_exprs() -> Result<()> {
Arc::new(Column::new("b", 1)),
)),
],
DataType::Int32,
Field::new("f", DataType::Int32, true),
)),
Arc::new(CaseExpr::try_new(
Some(Arc::new(Column::new("d", 3))),
Expand Down Expand Up @@ -261,7 +261,7 @@ fn test_update_projected_exprs() -> Result<()> {
Arc::new(Column::new("b", 1)),
)),
],
DataType::Int32,
Field::new("f", DataType::Int32, true),
)),
Arc::new(CaseExpr::try_new(
Some(Arc::new(Column::new("d", 2))),
Expand Down Expand Up @@ -326,7 +326,7 @@ fn test_update_projected_exprs() -> Result<()> {
Arc::new(Column::new("b_new", 1)),
)),
],
DataType::Int32,
Field::new("f", DataType::Int32, true),
)),
Arc::new(CaseExpr::try_new(
Some(Arc::new(Column::new("d_new", 3))),
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/tpc-ds/49.sql
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ select channel, item, return_ratio, return_rank, currency_rank from
where
sr.sr_return_amt > 10000
and sts.ss_net_profit > 1
and sts.ss_net_paid > 0
and sts.ss_net_paid > 0
and sts.ss_quantity > 0
and ss_sold_date_sk = d_date_sk
and d_year = 2000
Expand Down
Loading