Skip to content

Fix join type coercion when joining 2 relations with the same name via DataFrame API #14387

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use arrow::{
record_batch::RecordBatch,
};
use arrow_array::{
Array, BooleanArray, DictionaryArray, Float32Array, Float64Array, Int8Array,
UnionArray,
record_batch, Array, BooleanArray, DictionaryArray, Float32Array, Float64Array,
Int8Array, UnionArray,
};
use arrow_buffer::ScalarBuffer;
use arrow_schema::{ArrowError, SchemaRef, UnionFields, UnionMode};
Expand Down Expand Up @@ -1121,6 +1121,39 @@ async fn join() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn join_coercion_unnnamed() -> Result<()> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should "unnnamed" be corrected to "unnamed"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thank you. That is a typo

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
async fn join_coercion_unnnamed() -> Result<()> {
async fn join_coercion_unnamed() -> Result<()> {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let ctx = SessionContext::new();

// Test that join will coerce column types when necessary
// even when the relations don't have unique names
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: when the columns don't have unique names, is it safe to assume that the unqualified columns in left_cols/right_cols refer to the left/right relations, respectively? Since technically a valid join predicate is for example, t1.id = t1.id.

Seems like it would make sense to me most of the time! But there is some level of implicitness here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when the columns don't have unique names, is it safe to assume that the unqualified columns in left_cols/right_cols refer to the left/right relations, respectively?

Yes, i think so: https://docs.rs/datafusion/latest/datafusion/dataframe/struct.DataFrame.html#method.join

Though perhaps we could make that explicit in the docs 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would be a good idea! 🙌🏾

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any chance you are willing to make a PR? I would be happy to review it :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! Add a small doc update here #14393

let left = ctx.read_batch(record_batch!(
("id", Int32, [1, 2, 3]),
("name", Utf8, ["a", "b", "c"])
)?)?;
let right = ctx.read_batch(record_batch!(
("id", Int32, [10, 3]),
("name", Utf8View, ["d", "c"]) // Utf8View is a different type
)?)?;
let cols = vec!["name", "id"];

let filter = None;
let join = right.join(left, JoinType::LeftAnti, &cols, &cols, filter)?;
let results = join.collect().await?;

assert_batches_sorted_eq!(
[
"+----+------+",
"| id | name |",
"+----+------+",
"| 10 | d |",
"+----+------+",
],
&results
);
Ok(())
}

#[tokio::test]
async fn join_on() -> Result<()> {
let left = test_table_with_name("a")
Expand Down
23 changes: 17 additions & 6 deletions datafusion/optimizer/src/analyzer/type_coercion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,15 @@ impl<'a> TypeCoercionRewriter<'a> {
.map(|(lhs, rhs)| {
// coerce the arguments as though they were a single binary equality
// expression
let (lhs, rhs) = self.coerce_binary_op(lhs, Operator::Eq, rhs)?;
let left_schema = join.left.schema();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Join output schema is the left schema concatenated with the right schema

Normally, since the lhs and rhs have fully qualified names (t1.a, t2.a) resolving their types in terms of the output schema is not a problem

However, when the relations on both sides have the same relation name but different types (which can happen with DataFrames) it is important to resolve the types in terms of the left/right schema

let right_schema = join.right.schema();
let (lhs, rhs) = self.coerce_binary_op(
lhs,
left_schema,
Operator::Eq,
rhs,
right_schema,
)?;
Ok((lhs, rhs))
})
.collect::<Result<Vec<_>>>()?;
Expand Down Expand Up @@ -275,17 +283,19 @@ impl<'a> TypeCoercionRewriter<'a> {
fn coerce_binary_op(
&self,
left: Expr,
left_schema: &DFSchema,
op: Operator,
right: Expr,
right_schema: &DFSchema,
) -> Result<(Expr, Expr)> {
let (left_type, right_type) = get_input_types(
&left.get_type(self.schema)?,
&left.get_type(left_schema)?,
&op,
&right.get_type(self.schema)?,
&right.get_type(right_schema)?,
)?;
Ok((
left.cast_to(&left_type, self.schema)?,
right.cast_to(&right_type, self.schema)?,
left.cast_to(&left_type, left_schema)?,
right.cast_to(&right_type, right_schema)?,
))
}
}
Expand Down Expand Up @@ -404,7 +414,8 @@ impl TreeNodeRewriter for TypeCoercionRewriter<'_> {
))))
}
Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
let (left, right) = self.coerce_binary_op(*left, op, *right)?;
let (left, right) =
self.coerce_binary_op(*left, self.schema, op, *right, self.schema)?;
Ok(Transformed::yes(Expr::BinaryExpr(BinaryExpr::new(
Box::new(left),
op,
Expand Down
Loading