From a3e116e9eaca1b3808da46fce79299b1c64469db Mon Sep 17 00:00:00 2001 From: Alexander Beedie Date: Wed, 29 May 2024 17:36:34 +0400 Subject: [PATCH] fix: Resolve multiple SQL `JOIN` issues (#16507) --- Cargo.lock | 1 + crates/polars-lazy/src/frame/mod.rs | 11 +- crates/polars-lazy/src/tests/cse.rs | 2 +- .../src/tests/projection_queries.rs | 2 +- crates/polars-lazy/src/tests/queries.rs | 2 +- crates/polars-lazy/src/tests/streaming.rs | 14 +- crates/polars-ops/src/frame/join/args.rs | 5 + crates/polars-plan/src/dsl/expr.rs | 2 +- .../optimizer/projection_pushdown/joins.rs | 2 +- crates/polars-sql/Cargo.toml | 1 + crates/polars-sql/src/context.rs | 147 +++++++--- crates/polars-sql/src/sql_expr.rs | 46 ++-- crates/polars-sql/tests/statements.rs | 255 ++++++++++++++---- .../rust/user-guide/transformations/joins.rs | 2 +- py-polars/tests/unit/sql/test_joins.py | 109 +++++++- .../tests/unit/sql/test_miscellaneous.py | 2 +- py-polars/tests/unit/sql/test_subqueries.py | 79 ++---- py-polars/tests/unit/test_queries.py | 2 +- 18 files changed, 492 insertions(+), 192 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e4e8b015c8c3..2703f5de5a08 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3193,6 +3193,7 @@ dependencies = [ "polars-core", "polars-error", "polars-lazy", + "polars-ops", "polars-plan", "rand", "serde", diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index 66d12dc18b8a..44155a040995 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -1085,8 +1085,13 @@ impl LazyFrame { /// Creates the Cartesian product from both frames, preserving the order of the left keys. #[cfg(feature = "cross_join")] - pub fn cross_join(self, other: LazyFrame) -> LazyFrame { - self.join(other, vec![], vec![], JoinArgs::new(JoinType::Cross)) + pub fn cross_join(self, other: LazyFrame, suffix: Option) -> LazyFrame { + self.join( + other, + vec![], + vec![], + JoinArgs::new(JoinType::Cross).with_suffix(suffix), + ) } /// Left outer join this query with another lazy query. @@ -1237,9 +1242,7 @@ impl LazyFrame { if let Some(suffix) = args.suffix { builder = builder.suffix(suffix); } - // Note: args.slice is set by the optimizer - builder.finish() } diff --git a/crates/polars-lazy/src/tests/cse.rs b/crates/polars-lazy/src/tests/cse.rs index 4452b3845b46..95c6c5be64df 100644 --- a/crates/polars-lazy/src/tests/cse.rs +++ b/crates/polars-lazy/src/tests/cse.rs @@ -305,7 +305,7 @@ fn test_cse_columns_projections() -> PolarsResult<()> { ]? .lazy(); - let left = left.cross_join(right.clone().select([col("A")])); + let left = left.cross_join(right.clone().select([col("A")]), None); let q = left.join( right.rename(["B"], ["C"]), [col("A"), col("C")], diff --git a/crates/polars-lazy/src/tests/projection_queries.rs b/crates/polars-lazy/src/tests/projection_queries.rs index 71e43ab10d3e..e3d5cb9f25dd 100644 --- a/crates/polars-lazy/src/tests/projection_queries.rs +++ b/crates/polars-lazy/src/tests/projection_queries.rs @@ -47,7 +47,7 @@ fn test_cross_join_pd() -> PolarsResult<()> { "price" => [5, 4] ]?; - let q = food.lazy().cross_join(drink.lazy()).select([ + let q = food.lazy().cross_join(drink.lazy(), None).select([ col("name").alias("food"), col("name_right").alias("beverage"), (col("price") + col("price_right")).alias("total"), diff --git a/crates/polars-lazy/src/tests/queries.rs b/crates/polars-lazy/src/tests/queries.rs index 822c830d9d1c..7b3c76487080 100644 --- a/crates/polars-lazy/src/tests/queries.rs +++ b/crates/polars-lazy/src/tests/queries.rs @@ -1171,7 +1171,7 @@ fn test_cross_join() -> PolarsResult<()> { "b" => [None, Some(12)] ]?; - let out = df1.lazy().cross_join(df2.lazy()).collect()?; + let out = df1.lazy().cross_join(df2.lazy(), None).collect()?; assert_eq!(out.shape(), (6, 4)); Ok(()) } diff --git a/crates/polars-lazy/src/tests/streaming.rs b/crates/polars-lazy/src/tests/streaming.rs index e34c16a34334..1ca82f18b832 100644 --- a/crates/polars-lazy/src/tests/streaming.rs +++ b/crates/polars-lazy/src/tests/streaming.rs @@ -85,7 +85,7 @@ fn test_streaming_union_order() -> PolarsResult<()> { fn test_streaming_union_join() -> PolarsResult<()> { let q = get_csv_glob(); let q = q.select([col("sugars_g"), col("calories")]); - let q = q.clone().cross_join(q); + let q = q.clone().cross_join(q, None); assert_streaming_with_default(q, true, true); Ok(()) @@ -166,18 +166,22 @@ fn test_streaming_cross_join() -> PolarsResult<()> { "a" => [1 ,2, 3] ]?; let q = df.lazy(); - let out = q.clone().cross_join(q).with_streaming(true).collect()?; + let out = q + .clone() + .cross_join(q, None) + .with_streaming(true) + .collect()?; assert_eq!(out.shape(), (9, 2)); let q = get_parquet_file().with_projection_pushdown(false); let q1 = q .clone() .select([col("calories")]) - .cross_join(q.clone()) + .cross_join(q.clone(), None) .filter(col("calories").gt(col("calories_right"))); let q2 = q1 .select([all().name().suffix("_second")]) - .cross_join(q) + .cross_join(q, None) .filter(col("calories_right_second").lt(col("calories"))) .select([ col("calories"), @@ -266,7 +270,7 @@ fn test_streaming_slice() -> PolarsResult<()> { ]? .lazy(); - let q = lf_a.clone().cross_join(lf_a).slice(10, 20); + let q = lf_a.clone().cross_join(lf_a, None).slice(10, 20); let a = q.with_streaming(true).collect().unwrap(); assert_eq!(a.shape(), (20, 2)); diff --git a/crates/polars-ops/src/frame/join/args.rs b/crates/polars-ops/src/frame/join/args.rs index 4fbc30596834..f19be5352a56 100644 --- a/crates/polars-ops/src/frame/join/args.rs +++ b/crates/polars-ops/src/frame/join/args.rs @@ -88,6 +88,11 @@ impl JoinArgs { self } + pub fn with_suffix(mut self, suffix: Option) -> Self { + self.suffix = suffix; + self + } + pub fn suffix(&self) -> &str { self.suffix.as_deref().unwrap_or("_right") } diff --git a/crates/polars-plan/src/dsl/expr.rs b/crates/polars-plan/src/dsl/expr.rs index 62c16d5e9042..08688fcd5aba 100644 --- a/crates/polars-plan/src/dsl/expr.rs +++ b/crates/polars-plan/src/dsl/expr.rs @@ -304,7 +304,7 @@ pub enum Excluded { impl Expr { /// Get Field result of the expression. The schema is the input data. pub fn to_field(&self, schema: &Schema, ctxt: Context) -> PolarsResult { - // this is not called much and th expression depth is typically shallow + // this is not called much and the expression depth is typically shallow let mut arena = Arena::with_capacity(5); self.to_field_amortized(schema, ctxt, &mut arena) } diff --git a/crates/polars-plan/src/logical_plan/optimizer/projection_pushdown/joins.rs b/crates/polars-plan/src/logical_plan/optimizer/projection_pushdown/joins.rs index 04fcdb2832c9..4d9aa8f49f35 100644 --- a/crates/polars-plan/src/logical_plan/optimizer/projection_pushdown/joins.rs +++ b/crates/polars-plan/src/logical_plan/optimizer/projection_pushdown/joins.rs @@ -444,7 +444,7 @@ fn resolve_join_suffixes( .iter() .map(|proj| { let name = column_node_to_name(*proj, expr_arena); - if name.contains(suffix) && schema_after_join.get(&name).is_none() { + if name.ends_with(suffix) && schema_after_join.get(&name).is_none() { let downstream_name = &name.as_ref()[..name.len() - suffix.len()]; let col = AExpr::Column(ColumnName::from(downstream_name)); let node = expr_arena.add(col); diff --git a/crates/polars-sql/Cargo.toml b/crates/polars-sql/Cargo.toml index 65dc5e9523fd..1db0e88a116c 100644 --- a/crates/polars-sql/Cargo.toml +++ b/crates/polars-sql/Cargo.toml @@ -13,6 +13,7 @@ arrow = { workspace = true } polars-core = { workspace = true } polars-error = { workspace = true } polars-lazy = { workspace = true, features = ["abs", "binary_encoding", "concat_str", "cross_join", "cum_agg", "dtype-date", "dtype-decimal", "is_in", "list_eval", "log", "meta", "regex", "round_series", "sign", "string_reverse", "strings", "timezones", "trigonometry"] } +polars-ops = { workspace = true } polars-plan = { workspace = true } hex = { workspace = true } diff --git a/crates/polars-sql/src/context.rs b/crates/polars-sql/src/context.rs index 995f9df5af43..3a6ad0d6604e 100644 --- a/crates/polars-sql/src/context.rs +++ b/crates/polars-sql/src/context.rs @@ -3,18 +3,19 @@ use std::cell::RefCell; use polars_core::prelude::*; use polars_error::to_compute_err; use polars_lazy::prelude::*; +use polars_ops::frame::JoinCoalesce; use polars_plan::prelude::*; use sqlparser::ast::{ - Distinct, ExcludeSelectItem, Expr as SQLExpr, FunctionArg, GroupByExpr, JoinOperator, - ObjectName, ObjectType, Offset, OrderByExpr, Query, Select, SelectItem, SetExpr, SetOperator, - SetQuantifier, Statement, TableAlias, TableFactor, TableWithJoins, UnaryOperator, + Distinct, ExcludeSelectItem, Expr as SQLExpr, FunctionArg, GroupByExpr, JoinConstraint, + JoinOperator, ObjectName, ObjectType, Offset, OrderByExpr, Query, Select, SelectItem, SetExpr, + SetOperator, SetQuantifier, Statement, TableAlias, TableFactor, TableWithJoins, UnaryOperator, Value as SQLValue, WildcardAdditionalOptions, }; use sqlparser::dialect::GenericDialect; use sqlparser::parser::{Parser, ParserOptions}; use crate::function_registry::{DefaultFunctionRegistry, FunctionRegistry}; -use crate::sql_expr::{parse_sql_expr, process_join}; +use crate::sql_expr::{parse_sql_expr, process_join_constraint}; use crate::table_functions::PolarsTableFunctions; /// The SQLContext is the main entry point for executing SQL queries. @@ -22,10 +23,12 @@ use crate::table_functions::PolarsTableFunctions; pub struct SQLContext { pub(crate) table_map: PlHashMap, pub(crate) function_registry: Arc, + pub(crate) lp_arena: Arena, + pub(crate) expr_arena: Arena, + cte_map: RefCell>, - aliases: RefCell>, - lp_arena: Arena, - expr_arena: Arena, + table_aliases: RefCell>, + joined_aliases: RefCell>>, } impl Default for SQLContext { @@ -34,7 +37,8 @@ impl Default for SQLContext { function_registry: Arc::new(DefaultFunctionRegistry {}), table_map: Default::default(), cte_map: Default::default(), - aliases: Default::default(), + table_aliases: Default::default(), + joined_aliases: Default::default(), lp_arena: Default::default(), expr_arena: Default::default(), } @@ -114,7 +118,8 @@ impl SQLContext { .map_err(to_compute_err)? .parse_statements() .map_err(to_compute_err)?; - polars_ensure!(ast.len() == 1, ComputeError: "One and only one statement at a time please"); + + polars_ensure!(ast.len() == 1, ComputeError: "One (and only one) statement at a time please"); let res = self.execute_statement(ast.first().unwrap())?; // Ensure the result uses the proper arenas. @@ -123,9 +128,11 @@ impl SQLContext { let expr_arena = std::mem::take(&mut self.expr_arena); res.set_cached_arena(lp_arena, expr_arena); - // Every execution should clear the CTE map. + // Every execution should clear the statement-level maps. self.cte_map.borrow_mut().clear(); - self.aliases.borrow_mut().clear(); + self.table_aliases.borrow_mut().clear(); + self.joined_aliases.borrow_mut().clear(); + Ok(res) } @@ -148,22 +155,6 @@ impl SQLContext { } impl SQLContext { - fn register_cte(&mut self, name: &str, lf: LazyFrame) { - self.cte_map.borrow_mut().insert(name.to_owned(), lf); - } - - pub(super) fn get_table_from_current_scope(&self, name: &str) -> Option { - let table_name = self.table_map.get(name).cloned(); - table_name - .or_else(|| self.cte_map.borrow().get(name).cloned()) - .or_else(|| { - self.aliases - .borrow() - .get(name) - .and_then(|alias| self.table_map.get(alias).cloned()) - }) - } - pub(crate) fn execute_statement(&mut self, stmt: &Statement) -> PolarsResult { let ast = stmt; Ok(match ast { @@ -194,6 +185,31 @@ impl SQLContext { self.process_limit_offset(lf, &query.limit, &query.offset) } + pub(super) fn get_table_from_current_scope(&self, name: &str) -> Option { + let table_name = self.table_map.get(name).cloned(); + table_name + .or_else(|| self.cte_map.borrow().get(name).cloned()) + .or_else(|| { + self.table_aliases + .borrow() + .get(name) + .and_then(|alias| self.table_map.get(alias).cloned()) + }) + } + + pub(super) fn resolve_name(&self, tbl_name: &str, column_name: &str) -> String { + if self.joined_aliases.borrow().contains_key(tbl_name) { + self.joined_aliases + .borrow() + .get(tbl_name) + .and_then(|aliases| aliases.get(column_name)) + .cloned() + .unwrap_or_else(|| column_name.to_string()) + } else { + column_name.to_string() + } + } + fn process_set_expr(&mut self, expr: &SetExpr, query: &Query) -> PolarsResult { match expr { SetExpr::Select(select_stmt) => self.execute_select(select_stmt, query), @@ -312,6 +328,10 @@ impl SQLContext { } } + fn register_cte(&mut self, name: &str, lf: LazyFrame) { + self.cte_map.borrow_mut().insert(name.to_owned(), lf); + } + fn register_ctes(&mut self, query: &Query) -> PolarsResult<()> { if let Some(with) = &query.with { if with.recursive { @@ -331,41 +351,67 @@ impl SQLContext { let (l_name, mut lf) = self.get_table(&tbl_expr.relation)?; if !tbl_expr.joins.is_empty() { for tbl in &tbl_expr.joins { - let (r_name, rf) = self.get_table(&tbl.relation)?; + let (r_name, mut rf) = self.get_table(&tbl.relation)?; + let left_schema = + lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)?; + let right_schema = + rf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)?; + lf = match &tbl.join_operator { - JoinOperator::CrossJoin => lf.cross_join(rf), JoinOperator::FullOuter(constraint) => { - process_join(lf, rf, constraint, &l_name, &r_name, JoinType::Full)? + self.process_join(lf, rf, constraint, &l_name, &r_name, JoinType::Full)? }, JoinOperator::Inner(constraint) => { - process_join(lf, rf, constraint, &l_name, &r_name, JoinType::Inner)? + self.process_join(lf, rf, constraint, &l_name, &r_name, JoinType::Inner)? }, JoinOperator::LeftOuter(constraint) => { - process_join(lf, rf, constraint, &l_name, &r_name, JoinType::Left)? + self.process_join(lf, rf, constraint, &l_name, &r_name, JoinType::Left)? }, #[cfg(feature = "semi_anti_join")] JoinOperator::LeftAnti(constraint) => { - process_join(lf, rf, constraint, &l_name, &r_name, JoinType::Anti)? + self.process_join(lf, rf, constraint, &l_name, &r_name, JoinType::Anti)? }, #[cfg(feature = "semi_anti_join")] JoinOperator::LeftSemi(constraint) => { - process_join(lf, rf, constraint, &l_name, &r_name, JoinType::Semi)? + self.process_join(lf, rf, constraint, &l_name, &r_name, JoinType::Semi)? }, #[cfg(feature = "semi_anti_join")] JoinOperator::RightAnti(constraint) => { - process_join(rf, lf, constraint, &l_name, &r_name, JoinType::Anti)? + self.process_join(rf, lf, constraint, &l_name, &r_name, JoinType::Anti)? }, #[cfg(feature = "semi_anti_join")] JoinOperator::RightSemi(constraint) => { - process_join(rf, lf, constraint, &l_name, &r_name, JoinType::Semi)? + self.process_join(rf, lf, constraint, &l_name, &r_name, JoinType::Semi)? }, + JoinOperator::CrossJoin => lf.cross_join(rf, Some(format!(":{}", r_name))), join_type => { polars_bail!( InvalidOperation: "join type '{:?}' not yet supported by polars-sql", join_type ); }, - } + }; + + // track join-aliased columns so we can resolve them later + let joined_schema = + lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)?; + self.joined_aliases.borrow_mut().insert( + r_name.to_string(), + right_schema + .iter_names() + .filter_map(|name| { + // col exists in both tables and is aliased in the joined result + let aliased_name = format!("{}:{}", name, r_name); + if left_schema.contains(name) + && joined_schema.contains(aliased_name.as_str()) + { + Some((name.to_string(), aliased_name)) + } else { + None + } + }) + .collect::>(), + ); } }; Ok(lf) @@ -594,6 +640,31 @@ impl SQLContext { Ok(lf) } + pub(super) fn process_join( + &self, + left_tbl: LazyFrame, + right_tbl: LazyFrame, + constraint: &JoinConstraint, + tbl_name: &str, + join_tbl_name: &str, + join_type: JoinType, + ) -> PolarsResult { + let (left_on, right_on) = process_join_constraint(constraint, tbl_name, join_tbl_name)?; + + let joined_tbl = left_tbl + .clone() + .join_builder() + .with(right_tbl.clone()) + .left_on(left_on) + .right_on(right_on) + .how(join_type) + .suffix(format!(":{}", join_tbl_name)) + .coalesce(JoinCoalesce::KeepColumns) + .finish(); + + Ok(joined_tbl) + } + fn process_subqueries(&self, lf: LazyFrame, exprs: Vec<&mut Expr>) -> LazyFrame { let mut contexts = vec![]; for expr in exprs { @@ -660,7 +731,7 @@ impl SQLContext { if let Some(lf) = self.get_table_from_current_scope(tbl_name) { match alias { Some(alias) => { - self.aliases + self.table_aliases .borrow_mut() .insert(alias.name.value.clone(), tbl_name.to_string()); Ok((alias.to_string(), lf)) diff --git a/crates/polars-sql/src/sql_expr.rs b/crates/polars-sql/src/sql_expr.rs index c18b79eb4f87..7a31f8bbb988 100644 --- a/crates/polars-sql/src/sql_expr.rs +++ b/crates/polars-sql/src/sql_expr.rs @@ -299,7 +299,7 @@ impl SQLExprVisitor<'_> { } let mut lf = self.ctx.execute_query_no_ctes(subquery)?; - let schema = lf.schema()?; + let schema = lf.schema_with_arenas(&mut self.ctx.lp_arena, &mut self.ctx.expr_arena)?; if restriction == SubqueryRestriction::SingleColumn { if schema.len() != 1 { @@ -335,7 +335,7 @@ impl SQLExprVisitor<'_> { /// Visit a compound SQL identifier /// /// e.g. df.column or "df"."column" - fn visit_compound_identifier(&self, idents: &[Ident]) -> PolarsResult { + fn visit_compound_identifier(&mut self, idents: &[Ident]) -> PolarsResult { match idents { [tbl_name, column_name] => { let mut lf = self @@ -348,9 +348,15 @@ impl SQLExprVisitor<'_> { ) })?; - let schema = lf.schema()?; + let schema = + lf.schema_with_arenas(&mut self.ctx.lp_arena, &mut self.ctx.expr_arena)?; if let Some((_, name, _)) = schema.get_full(&column_name.value) { - Ok(col(name)) + let resolved = &self.ctx.resolve_name(&tbl_name.value, &column_name.value); + Ok(if name != resolved { + col(resolved).alias(name) + } else { + col(name) + }) } else { polars_bail!( ColumnNotFound: "no column named '{}' found in table '{}'", @@ -959,25 +965,6 @@ impl SQLExprVisitor<'_> { } } -pub(super) fn process_join( - left_tbl: LazyFrame, - right_tbl: LazyFrame, - constraint: &JoinConstraint, - tbl_name: &str, - join_tbl_name: &str, - join_type: JoinType, -) -> PolarsResult { - let (left_on, right_on) = process_join_constraint(constraint, tbl_name, join_tbl_name)?; - - Ok(left_tbl - .join_builder() - .with(right_tbl) - .left_on(left_on) - .right_on(right_on) - .how(join_type) - .finish()) -} - fn collect_compound_identifiers( left: &[Ident], right: &[Ident], @@ -988,12 +975,11 @@ fn collect_compound_identifiers( let (tbl_a, col_a) = (&left[0].value, &left[1].value); let (tbl_b, col_b) = (&right[0].value, &right[1].value); - if left_name == tbl_a && right_name == tbl_b { - Ok((vec![col(col_a)], vec![col(col_b)])) - } else if left_name == tbl_b && right_name == tbl_a { + // switch left/right operands if the caller has them in reverse + if left_name == tbl_b || right_name == tbl_a { Ok((vec![col(col_b)], vec![col(col_a)])) } else { - polars_bail!(InvalidOperation: "collect_compound_identifiers: left_name={:?}, right_name={:?}, tbl_a={:?}, tbl_b={:?}", left_name, right_name, tbl_a, tbl_b); + Ok((vec![col(col_a)], vec![col(col_b)])) } } else { polars_bail!(InvalidOperation: "collect_compound_identifiers: Expected left.len() == 2 && right.len() == 2, but found left.len() == {:?}, right.len() == {:?}", left.len(), right.len()); @@ -1042,9 +1028,9 @@ pub(super) fn process_join_constraint( if let JoinConstraint::On(SQLExpr::BinaryOp { left, op, right }) = constraint { if op == &BinaryOperator::And { let (mut left_on, mut right_on) = process_join_on(left, left_name, right_name)?; - let (left_on_2, right_on_2) = process_join_on(right, left_name, right_name)?; - left_on.extend(left_on_2); - right_on.extend(right_on_2); + let (left_on_, right_on_) = process_join_on(right, left_name, right_name)?; + left_on.extend(left_on_); + right_on.extend(right_on_); return Ok((left_on, right_on)); } if op != &BinaryOperator::Eq { diff --git a/crates/polars-sql/tests/statements.rs b/crates/polars-sql/tests/statements.rs index 712df6873d90..922603aeac79 100644 --- a/crates/polars-sql/tests/statements.rs +++ b/crates/polars-sql/tests/statements.rs @@ -188,11 +188,17 @@ fn iss_9560_join_as() { let expected = df! { "id" => [1, 2, 3, 4], "ano" => [2, 3, 4, 5], - "ano_right" => [2, 3, 4, 5], + "id:t2" => [1, 2, 3, 4], + "ano:t2" => [2, 3, 4, 5], } .unwrap(); - assert!(actual.equals(&expected)); + assert!( + actual.equals(&expected), + "expected = {:?}\nactual={:?}", + expected, + actual + ); } fn prepare_compound_join_context() -> SQLContext { @@ -206,11 +212,10 @@ fn prepare_compound_join_context() -> SQLContext { "b" => [0, 3, 4, 5, 6] } .unwrap(); - let df3 = df! { "a" => [1, 2, 3, 4, 5], - "b" => [0, 3, 4, 5, 6], - "c" => [0, 3, 4, 5, 6] + "b" => [0, 3, 4, 5, 7], + "c" => [1, 3, 4, 5, 7] } .unwrap(); let mut ctx = SQLContext::new(); @@ -232,6 +237,8 @@ fn test_compound_join_basic() { let expected = df! { "a" => [2, 3], "b" => [3, 4], + "a:df2" => [2, 3], + "b:df2" => [3, 4], } .unwrap(); @@ -253,22 +260,26 @@ fn test_compound_join_different_column_names() { let df2 = df! { "a" => [0, 2, 3, 4, 5], "b" => [1, 2, 3, 5, 6], - "c" => [7, 8, 9, 10, 11], + "c" => [7, 5, 3, 5, 7], } .unwrap(); + let mut ctx = SQLContext::new(); - ctx.register("df1", df1.lazy()); - ctx.register("df2", df2.lazy()); + ctx.register("lf1", df1.lazy()); + ctx.register("lf2", df2.lazy()); let sql = r#" - SELECT * FROM df1 INNER JOIN df2 ON df1.a = df2.b AND df1.b = df2.a + SELECT lf1.a, lf2.b, lf2.c + FROM lf1 INNER JOIN lf2 + -- note: uses "lf1.a" for *both* constraint arms + ON lf1.a = lf2.b AND lf1.a = lf2.c + ORDER BY a "#; let actual = ctx.execute(sql).unwrap().collect().unwrap(); - let expected = df! { - "a" => [2, 3], - "b" => [2, 3], - "c" => [8, 9], + "a" => [3, 5], + "b" => [3, 5], + "c" => [3, 5], } .unwrap(); @@ -284,14 +295,13 @@ fn test_compound_join_different_column_names() { fn test_compound_join_three_tables() { let mut ctx = prepare_compound_join_context(); let sql = r#" - SELECT * FROM df1 - INNER JOIN df2 - ON df1.a = df2.a AND df1.b = df2.b - INNER JOIN df3 - ON df1.a = df3.a AND df1.b = df3.b + SELECT df3.* FROM df1 + INNER JOIN df2 + ON df1.a = df2.a AND df1.b = df2.b + INNER JOIN df3 + ON df3.a = df1.a AND df3.b = df1.b "#; let actual = ctx.execute(sql).unwrap().collect().unwrap(); - let expected = df! { "a" => [2, 3], "b" => [3, 4], @@ -323,34 +333,89 @@ fn test_compound_join_nested_and() { "d" => [0, 3, 4, 5, 6] } .unwrap(); + let mut ctx = SQLContext::new(); ctx.register("df1", df1.lazy()); ctx.register("df2", df2.lazy()); - let sql = r#" - SELECT * FROM df1 - INNER JOIN df2 ON - df1.a = df2.a AND - df1.b = df2.b AND - df1.c = df2.c AND - df1.d = df2.d - "#; - let actual = ctx.execute(sql).unwrap().collect().unwrap(); + for cols in [ + "df1.*", + "df2.*", + "df1.a, df1.b, df2.c, df2.d", + "df2.a, df2.b, df1.c, df1.d", + ] { + let sql = format!( + r#" + SELECT {} FROM df1 + INNER JOIN df2 ON + df1.a = df2.a AND + df1.b = df2.b AND + df1.c = df2.c AND + df1.d = df2.d + "#, + cols + ); + let actual = ctx.execute(sql.as_str()).unwrap().collect().unwrap(); + let expected = df! { + "a" => [1, 3], + "b" => [1, 3], + "c" => [0, 4], + "d" => [0, 4], + } + .unwrap(); + + assert!( + actual.equals(&expected), + "expected = {:?}\nactual={:?}", + expected, + actual + ); + } +} - let expected = df! { - "a" => [1, 3], - "b" => [1, 3], - "c" => [0, 4], - "d" => [0, 4], +#[test] +fn test_resolve_join_column_select_13618() { + let df1 = df! { + "A" => [1, 2, 3, 4, 5], + "B" => [5, 4, 3, 2, 1], + "fruits" => ["banana", "banana", "apple", "apple", "banana"], + "cars" => ["beetle", "audi", "beetle", "beetle", "beetle"], } .unwrap(); + let df2 = df1.clone(); - assert!( - actual.equals(&expected), - "expected = {:?}\nactual={:?}", - expected, - actual - ); + let mut ctx = SQLContext::new(); + ctx.register("tbl", df1.lazy()); + ctx.register("other", df2.lazy()); + + let join_types = vec!["LEFT", "INNER", "FULL OUTER", ""]; + for join_type in join_types { + let sql = format!( + r#" + SELECT tbl.A, other.B, tbl.fruits, other.cars + FROM tbl + {} JOIN other ON tbl.A = other.B + ORDER BY tbl.A ASC + "#, + join_type + ); + let actual = ctx.execute(sql.as_str()).unwrap().collect().unwrap(); + let expected = df! { + "A" => [1, 2, 3, 4, 5], + "B" => [1, 2, 3, 4, 5], + "fruits" => ["banana", "banana", "apple", "apple", "banana"], + "cars" => ["beetle", "beetle", "beetle", "audi", "beetle"], + } + .unwrap(); + + assert!( + actual.equals(&expected), + "({} JOIN) expected = {:?}\nactual={:?}", + join_type, + expected, + actual + ); + } } #[test] @@ -360,26 +425,50 @@ fn test_compound_join_nested_and_with_brackets() { "b" => [1, 2, 3, 4, 5], "c" => [0, 3, 4, 5, 6], "d" => [0, 3, 4, 5, 6], + "e" => ["a", "b", "c", "d", "?"], } .unwrap(); let df2 = df! { "a" => [1, 2, 3, 4, 5], "b" => [1, 3, 3, 5, 6], "c" => [0, 3, 4, 5, 6], - "d" => [0, 3, 4, 5, 6] + "d" => [0, 3, 4, 5, 6], + "e" => ["w", "x", "y", "z", "!"], } .unwrap(); + let mut ctx = SQLContext::new(); ctx.register("df1", df1.lazy()); ctx.register("df2", df2.lazy()); let sql = r#" - SELECT * FROM df1 - INNER JOIN df2 ON - df1.a = df2.a AND - ((df1.b = df2.b AND - df1.c = df2.c) AND - df1.d = df2.d) + SELECT df1.* EXCLUDE "e", df2.e + FROM df1 + INNER JOIN df2 ON df1.a = df2.a AND + ((df1.b = df2.b AND df1.c = df2.c) AND df1.d = df2.d) + "#; + let actual = ctx.execute(sql).unwrap().collect().unwrap(); + let expected = df! { + "a" => [1, 3], + "b" => [1, 3], + "c" => [0, 4], + "d" => [0, 4], + "e" => ["w", "y"], + } + .unwrap(); + + assert!( + actual.equals(&expected), + "expected = {:?}\nactual={:?}", + expected, + actual + ); + + let sql = r#" + SELECT * EXCLUDE ("e", "e:df2"), df1.e + FROM df1 + INNER JOIN df2 ON df1.a = df2.a AND + ((df1.b = df2.b AND df1.c = df2.c) AND df1.d = df2.d) "#; let actual = ctx.execute(sql).unwrap().collect().unwrap(); @@ -388,9 +477,83 @@ fn test_compound_join_nested_and_with_brackets() { "b" => [1, 3], "c" => [0, 4], "d" => [0, 4], + "a:df2" => [1, 3], + "b:df2" => [1, 3], + "c:df2" => [0, 4], + "d:df2" => [0, 4], + "e" => ["a", "c"], + } + .unwrap(); + + assert!( + actual.equals(&expected), + "expected = {:?}\nactual={:?}", + expected, + actual + ); +} + +#[test] +fn test_join_on_different_keys() { + let df1 = df! {"x" => [-1, 0, 1, 2, 3, 4]}.unwrap(); + let df2 = df! {"y" => [0, 1, -2, 3, 5, 6]}.unwrap(); + + let mut ctx = SQLContext::new(); + ctx.register("df1", df1.lazy()); + ctx.register("df2", df2.lazy()); + + // join on x = y + let sql = r#" + SELECT df2.* + FROM df1 + INNER JOIN df2 ON df1.x = df2.y + ORDER BY y + "#; + let actual = ctx.execute(sql).unwrap().collect().unwrap(); + let expected = df! {"y" => [0, 1, 3]}.unwrap(); + assert!( + actual.equals(&expected), + "expected = {:?}\nactual={:?}", + expected, + actual + ); +} + +#[test] +fn test_join_utf8() { + // (色) color and (野菜) vegetable + let df1 = df! { + "色" => ["赤", "緑", "黄色"], + "野菜" => ["トマト", "ケール", "コーン"], + } + .unwrap(); + + // (色) color and (動物) animal + let df2 = df! { + "色" => ["黄色", "緑", "赤"], + "動物" => ["ゴシキヒワ", "蛙", "レッサーパンダ"], } .unwrap(); + let mut ctx = SQLContext::new(); + ctx.register("df1", df1.lazy()); + ctx.register("df2", df2.lazy()); + + let expected = df! { + "色" => ["緑", "赤", "黄色"], // green, red, yellow + "野菜" => ["ケール", "トマト", "コーン"], // kale, tomato, corn + "動物" => ["蛙", "レッサーパンダ", "ゴシキヒワ"], // frog, red panda, goldfinch + } + .unwrap(); + + let sql = r#" + SELECT df1.*, df2.動物 + FROM df1 + INNER JOIN df2 ON df1.色 = df2.色 + ORDER BY 色 + "#; + let actual = ctx.execute(sql).unwrap().collect().unwrap(); + assert!( actual.equals(&expected), "expected = {:?}\nactual={:?}", diff --git a/docs/src/rust/user-guide/transformations/joins.rs b/docs/src/rust/user-guide/transformations/joins.rs index cc6d7ec9cb6a..26e4a2a067a6 100644 --- a/docs/src/rust/user-guide/transformations/joins.rs +++ b/docs/src/rust/user-guide/transformations/joins.rs @@ -96,7 +96,7 @@ fn main() -> Result<(), Box> { let df_cross_join = df_colors .clone() .lazy() - .cross_join(df_sizes.clone().lazy()) + .cross_join(df_sizes.clone().lazy(), None) .collect()?; println!("{}", &df_cross_join); // --8<-- [end:cross] diff --git a/py-polars/tests/unit/sql/test_joins.py b/py-polars/tests/unit/sql/test_joins.py index 8c8d3069ddc3..0a62b3d9bb13 100644 --- a/py-polars/tests/unit/sql/test_joins.py +++ b/py-polars/tests/unit/sql/test_joins.py @@ -1,5 +1,6 @@ from __future__ import annotations +from io import BytesIO from pathlib import Path import pytest @@ -61,6 +62,45 @@ def test_join_anti_semi(sql: str, expected: pl.DataFrame) -> None: assert_frame_equal(expected, ctx.execute(sql)) +def test_join_cross() -> None: + frames = { + "tbl_a": pl.DataFrame({"a": [1, 2, 3], "b": [4, 0, 6], "c": ["w", "y", "z"]}), + "tbl_b": pl.DataFrame({"a": [3, 2, 1], "b": [6, 5, 4], "c": ["x", "y", "z"]}), + } + with pl.SQLContext(frames, eager_execution=True) as ctx: + out = ctx.execute( + """ + SELECT * + FROM tbl_a + CROSS JOIN tbl_b + ORDER BY a, b, c + """ + ) + assert out.rows() == [ + (1, 4, "w", 3, 6, "x"), + (1, 4, "w", 2, 5, "y"), + (1, 4, "w", 1, 4, "z"), + (2, 0, "y", 3, 6, "x"), + (2, 0, "y", 2, 5, "y"), + (2, 0, "y", 1, 4, "z"), + (3, 6, "z", 3, 6, "x"), + (3, 6, "z", 2, 5, "y"), + (3, 6, "z", 1, 4, "z"), + ] + + +def test_join_cross_11927() -> None: + df1 = pl.DataFrame({"id": [1, 2, 3]}) + df2 = pl.DataFrame({"id": [3, 4, 5]}) # noqa: F841 + df3 = pl.DataFrame({"id": [4, 5, 6]}) # noqa: F841 + + res = df1.sql("SELECT df2.id FROM self CROSS JOIN df2 WHERE self.id = df2.id") + assert_frame_equal(res, pl.DataFrame({"id": [3]})) + + res = df1.sql("SELECT * FROM self CROSS JOIN df3 WHERE self.id = df3.id") + assert res.is_empty() + + @pytest.mark.parametrize( "join_clause", [ @@ -88,9 +128,10 @@ def test_join_inner(foods_ipc_path: Path, join_clause: str) -> None: "calories": [45, 20], "fats_g": [0.5, 0.0], "sugars_g": [2, 2], - "calories_right": [45, 45], - "fats_g_right": [0.5, 0.5], - "sugars_g_right": [2, 2], + "category:foods2": ["vegetables", "vegetables"], + "calories:foods2": [45, 45], + "fats_g:foods2": [0.5, 0.5], + "sugars_g:foods2": [2, 2], } @@ -122,6 +163,31 @@ def test_join_inner_multi(join_clause: str) -> None: assert out.collect().rows() == [(1, 4, "z", 25.5)] +def test_join_inner_15663() -> None: + df_a = pl.DataFrame({"LOCID": [1, 2, 3], "VALUE": [0.1, 0.2, 0.3]}) # noqa: F841 + df_b = pl.DataFrame({"LOCID": [1, 2, 3], "VALUE": [25.6, 53.4, 12.7]}) # noqa: F841 + expected = pl.DataFrame( + { + "LOCID": [1, 2, 3], + "VALUE_A": [0.1, 0.2, 0.3], + "VALUE_B": [25.6, 53.4, 12.7], + } + ) + with pl.SQLContext(register_globals=True, eager_execution=True) as ctx: + query = """ + SELECT + a.LOCID, + a.VALUE AS VALUE_A, + b.VALUE AS VALUE_B + FROM df_a AS a + INNER JOIN df_b AS b + USING (LOCID) + ORDER BY LOCID + """ + actual = ctx.execute(query) + assert_frame_equal(expected, actual) + + @pytest.mark.parametrize( "join_clause", [ @@ -179,6 +245,43 @@ def test_join_left_multi_nested() -> None: ] +def test_join_misc_13618() -> None: + import polars as pl + + df = pl.DataFrame( + { + "A": [1, 2, 3, 4, 5], + "B": [5, 4, 3, 2, 1], + "fruits": ["banana", "banana", "apple", "apple", "banana"], + "cars": ["beetle", "audi", "beetle", "beetle", "beetle"], + } + ) + res = ( + pl.SQLContext(t=df, t1=df, eager_execution=True) + .execute("SELECT t.A, t.fruits, t1.B, t1.cars FROM t JOIN t1 ON t.A=t1.B") + .to_dict(as_series=False) + ) + assert res == { + "A": [5, 4, 3, 2, 1], + "fruits": ["banana", "apple", "apple", "banana", "banana"], + "B": [5, 4, 3, 2, 1], + "cars": ["beetle", "audi", "beetle", "beetle", "beetle"], + } + + +def test_join_misc_16255() -> None: + df1 = pl.read_csv(BytesIO(b"id,data\n1,open")) + df2 = pl.read_csv(BytesIO(b"id,data\n1,closed")) # noqa: F841 + res = df1.sql( + """ + SELECT a.id, a.data AS d1, b.data AS d2 + FROM self AS a JOIN df2 AS b + ON a.id = b.id + """ + ) + assert res.rows() == [(1, "open", "closed")] + + @pytest.mark.parametrize( "constraint", ["tbl.a != tbl.b", "tbl.a > tbl.b", "a >= b", "a < b", "b <= a"] ) diff --git a/py-polars/tests/unit/sql/test_miscellaneous.py b/py-polars/tests/unit/sql/test_miscellaneous.py index aaa51904bb4f..c0e46f190918 100644 --- a/py-polars/tests/unit/sql/test_miscellaneous.py +++ b/py-polars/tests/unit/sql/test_miscellaneous.py @@ -180,7 +180,7 @@ def test_order_by(foods_ipc_path: Path) -> None: df.x, df.y as y_alias FROM df - ORDER BY y + ORDER BY y_alias """, eager=True, ) diff --git a/py-polars/tests/unit/sql/test_subqueries.py b/py-polars/tests/unit/sql/test_subqueries.py index 7e9fc0e124d6..6c28372cd811 100644 --- a/py-polars/tests/unit/sql/test_subqueries.py +++ b/py-polars/tests/unit/sql/test_subqueries.py @@ -4,68 +4,33 @@ from polars.testing import assert_frame_equal -def test_join_on_subquery() -> None: - df1 = pl.DataFrame( - { - "x": [-1, 0, 1, 2, 3, 4], - } - ) - - df2 = pl.DataFrame( - { - "y": [0, 1, 2, 3], - } - ) - - sql = pl.SQLContext(df1=df1, df2=df2) - res = sql.execute( - """ - SELECT - * - FROM df1 - INNER JOIN (SELECT * FROM df2) AS df2 - ON df1.x = df2.y - """, - eager=True, - ) - - df_expected_join = pl.DataFrame({"x": [0, 1, 2, 3]}) - assert_frame_equal( - left=res, - right=df_expected_join, - ) - - -def test_from_subquery() -> None: - df1 = pl.DataFrame( - { - "x": [-1, 0, 1, 2, 3, 4], - } - ) - - df2 = pl.DataFrame( - { - "y": [0, 1, 2, 3], - } - ) +@pytest.mark.parametrize( + ("cols", "join_type", "constraint"), + [ + ("x", "INNER", ""), + ("y", "INNER", ""), + ("x", "LEFT", "WHERE y IN (0,1,2,3,4,5)"), + ("y", "LEFT", "WHERE y >= 0"), + ("df1.*", "FULL", "WHERE y >= 0"), + ("df2.*", "FULL", "WHERE x >= 0"), + ("* EXCLUDE y", "LEFT", "WHERE y >= 0"), + ("* EXCLUDE x", "LEFT", "WHERE x >= 0"), + ], +) +def test_from_subquery(cols: str, join_type: str, constraint: str) -> None: + df1 = pl.DataFrame({"x": [-1, 0, 3, 1, 2, -1]}) + df2 = pl.DataFrame({"y": [0, 1, 2, 3]}) sql = pl.SQLContext(df1=df1, df2=df2) res = sql.execute( - """ - SELECT - * - FROM (SELECT * FROM df1) AS df1 - INNER JOIN (SELECT * FROM df2) AS df2 - ON df1.x = df2.y + f""" + SELECT {cols} FROM (SELECT * FROM df1) AS df1 + {join_type} JOIN (SELECT * FROM df2) AS df2 + ON df1.x = df2.y {constraint} """, eager=True, ) - - df_expected_join = pl.DataFrame({"x": [0, 1, 2, 3]}) - assert_frame_equal( - left=res, - right=df_expected_join, - ) + assert sorted(res.to_series()) == [0, 1, 2, 3] def test_in_subquery() -> None: @@ -75,14 +40,12 @@ def test_in_subquery() -> None: "y": [2, 3, 4, 5, 6, 7], } ) - df_other = pl.DataFrame( { "w": [1, 2, 3, 4, 5, 6], "z": [2, 3, 4, 5, 6, 7], } ) - df_chars = pl.DataFrame( { "one": ["a", "b", "c", "d", "e", "f"], diff --git a/py-polars/tests/unit/test_queries.py b/py-polars/tests/unit/test_queries.py index 4ca3851a09d5..b71bbdc18e01 100644 --- a/py-polars/tests/unit/test_queries.py +++ b/py-polars/tests/unit/test_queries.py @@ -62,7 +62,7 @@ def test_overflow_uint16_agg_mean() -> None: pl.DataFrame( { "col1": ["A" for _ in range(1025)], - "col3": [64 for i in range(1025)], + "col3": [64 for _ in range(1025)], } ) .with_columns(