Skip to content

[DO NOT MERGE]Testing nested correlations handling #50720

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 49 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
9a6f982
add unresolved outer attrs
AveryQi115 Mar 15, 2025
52d5ce7
fix wrong number of arguments error; fix assertions
AveryQi115 Mar 15, 2025
e3bfef4
fix wrong number of arguments error
AveryQi115 Mar 15, 2025
995ffdd
fix wrong number of arguments error
AveryQi115 Mar 15, 2025
08d3cce
fix for mis-deleting ScalarSubquery.withNewOuterAttrs
AveryQi115 Mar 15, 2025
471f084
fmt
AveryQi115 Mar 15, 2025
4e0bf74
fix wrong number of arguments error
AveryQi115 Mar 15, 2025
bc9179e
fix wrong number of arguments error
AveryQi115 Mar 15, 2025
9559dbc
rename unresolved outer attrs to nested outer attrs
AveryQi115 Apr 9, 2025
edd6828
Analyzer support nested correlated subqueries
AveryQi115 Apr 9, 2025
bbfdd7b
fix compilation error
AveryQi115 Apr 9, 2025
dbb2dd1
throw internalErrors and format
AveryQi115 Apr 15, 2025
4500892
compile and format
AveryQi115 Apr 15, 2025
5886273
testing
AveryQi115 Apr 15, 2025
31937b6
try to align errors
averyqi-db Apr 16, 2025
4880813
remove temporary test first
averyqi-db Apr 16, 2025
fc37a5e
restore FunctionTableSubqueryArgumentExpression, output UNRESOLVED_CO…
averyqi-db Apr 16, 2025
9457a27
update updateOuterReferences for nested correlation
averyqi-db Apr 16, 2025
3acaafd
scalafmt
averyqi-db Apr 16, 2025
5de70ee
remove assertion as we might have duplicate column identifiers
averyqi-db Apr 16, 2025
1f6f000
new error type
averyqi-db Apr 17, 2025
aba5e81
format new error type
averyqi-db Apr 17, 2025
2787777
try regenerate golden files
averyqi-db Apr 17, 2025
e51ce61
update ResolveSubquerySuite
averyqi-db Apr 18, 2025
27c909c
fix ResolveSubquerySuite.scala
averyqi-db Apr 18, 2025
a318f1e
restore same behavior for lateral subqueyr
averyqi-db Apr 18, 2025
154b1db
restore error msg for lateral subquery
averyqi-db Apr 18, 2025
f9e2b23
update subquery's nested outer references in resolveAggregateFunction…
averyqi-db Apr 18, 2025
c428df8
remove temporary tests
averyqi-db Apr 18, 2025
8026f89
Merge branch 'master' into AveryQi115/analyzer_support_nested_correla…
AveryQi115 Apr 18, 2025
9932efe
restore missing_attributes error
averyqi-db Apr 18, 2025
508065d
add test
averyqi-db Apr 19, 2025
4faca48
generate test
averyqi-db Apr 19, 2025
9a31a2f
test
averyqi-db Apr 21, 2025
bb97392
deduplicate
averyqi-db Apr 21, 2025
26fb9fb
summarize not supported
averyqi-db Apr 21, 2025
7f30dfa
add new configs to control subquery type level feature
averyqi-db Apr 21, 2025
1465741
queries returning nonderterministic results are also supported.
averyqi-db Apr 21, 2025
9794f0f
ignore tests under nestedcorrelation in ThriftServerQueryTestSuite
averyqi-db Apr 21, 2025
8c3ce16
rename nestedOuterAttrs to outerScopeAttrs
averyqi-db Apr 21, 2025
a18e598
resolve comments
averyqi-db Apr 22, 2025
93d2003
revert deduplication because we don't want to change current behavior
averyqi-db Apr 23, 2025
d86f4b6
add unresolved outer attrs
AveryQi115 Mar 15, 2025
c3aa2b9
init
averyqi-db Apr 22, 2025
0373be1
style
averyqi-db Apr 23, 2025
3f60249
validateOuterScopeAttrs are used to check new outerScopeAttrs
averyqi-db Apr 29, 2025
5565bb3
Fix errors for subqueries in the having clause
averyqi-db Apr 29, 2025
08dbc74
new testcases
averyqi-db Apr 29, 2025
c7fe834
fix OptimizeOneRowRelationSubquery
averyqi-db Apr 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -4064,6 +4064,12 @@
],
"sqlState" : "07501"
},
"NESTED_REFERENCES_IN_SUBQUERY_NOT_SUPPORTED" : {
"message" : [
"Detected outer scope references <expression> in the subquery.This is not supported in the current version."
],
"sqlState" : "0A000"
},
"NONEXISTENT_FIELD_NAME_IN_LIST" : {
"message" : [
"Field(s) <nonExistFields> do(es) not exist. Available fields: <fieldNames>"
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,68 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString
}
}

def checkNoNestedOuterReferencesInMainQuery(plan: LogicalPlan): Unit = {
def hasOuterScopeAttrsInSubqueryExpression(expr: Expression): Boolean = {
expr.exists {
case subExpr: SubqueryExpression if subExpr.getOuterScopeAttrs.nonEmpty => true
case _ => false
}
}

def getOuterScopeAttrsFromSubqueryExpression(
plan: LogicalPlan): Seq[(SubqueryExpression, AttributeSet)] = {
val res = plan.expressions.flatMap {
expr => expr.collect {
case subExpr: SubqueryExpression if subExpr.getOuterScopeAttrs.nonEmpty =>
(subExpr, subExpr.getOuterScopeAttrs)
}
}
res.map {
case (subExpr, nestedOuterExprs) =>
val attrs = nestedOuterExprs.collect {
case a: AttributeReference => a
}
(subExpr, AttributeSet(attrs))
}
}

def findFirstOccurence(
plan: LogicalPlan,
outerScopeAttrs: AttributeSet,
operator: LogicalPlan): (LogicalPlan, AttributeSet) = {
val firstOccuredOperator = operator
plan.foreach {
case p if p.expressions.exists(hasOuterScopeAttrsInSubqueryExpression) =>
val res = getOuterScopeAttrsFromSubqueryExpression(p)
res.find(_._2.intersect(outerScopeAttrs).nonEmpty) match {
case Some((subExpr, outerScopeAttrsInP)) =>
return findFirstOccurence(subExpr.plan,
outerScopeAttrsInP.intersect(outerScopeAttrs), p)
case None => // Do nothing
}
case _ => // Do nothing
}
(firstOccuredOperator, outerScopeAttrs)
}
def throwUnresolvedColumnErrorForOuterScopeAttrs(plan: LogicalPlan): Unit = {
val (subExpr, outerScopeAttrs) = getOuterScopeAttrsFromSubqueryExpression(plan).head
val (operator, missingInput) = findFirstOccurence(subExpr.plan, outerScopeAttrs, plan)
operator.failAnalysis(
errorClass = "MISSING_ATTRIBUTES.RESOLVED_ATTRIBUTE_MISSING_FROM_INPUT",
messageParameters = Map(
"missingAttributes" -> missingInput.toSeq.map(attr => toSQLExpr(attr)).mkString(", "),
"input" -> operator.inputSet.map(attr => toSQLExpr(attr)).mkString(", "),
"operator" -> operator.simpleString(SQLConf.get.maxToStringFields)
)
)
}
plan.foreach {
case p: LogicalPlan if p.expressions.exists(hasOuterScopeAttrsInSubqueryExpression) =>
throwUnresolvedColumnErrorForOuterScopeAttrs(p)
case _ =>
}
}

def checkAnalysis(plan: LogicalPlan): Unit = {
// We should inline all CTE relations to restore the original plan shape, as the analysis check
// may need to match certain plan shapes. For dangling CTE relations, they will still be kept
Expand All @@ -241,6 +303,7 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString
}
preemptedError.clear()
try {
checkNoNestedOuterReferencesInMainQuery(inlinedPlan)
checkAnalysis0(inlinedPlan)
preemptedError.getErrorOpt().foreach(throw _) // throw preempted error if any
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,21 +199,29 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {

// Resolves `UnresolvedAttribute` to `OuterReference`.
protected def resolveOuterRef(e: Expression): Expression = {
val outerPlan = AnalysisContext.get.outerPlan
if (outerPlan.isEmpty) return e
val outerPlanContext = AnalysisContext.get.outerPlans
if (outerPlanContext.isEmpty) return e

def resolve(nameParts: Seq[String]): Option[Expression] = try {
outerPlan.get match {
// Subqueries in UnresolvedHaving can host grouping expressions and aggregate functions.
// We should resolve columns with `agg.output` and the rule `ResolveAggregateFunctions` will
// push them down to Aggregate later. This is similar to what we do in `resolveColumns`.
case u @ UnresolvedHaving(_, agg: Aggregate) =>
agg.resolveChildren(nameParts, conf.resolver)
.orElse(u.resolveChildren(nameParts, conf.resolver))
.map(wrapOuterReference)
case other =>
other.resolveChildren(nameParts, conf.resolver).map(wrapOuterReference)
val outerPlans = outerPlanContext.get
val resolvedExpressions = outerPlans.flatMap {
_ match {
// Subqueries in UnresolvedHaving can host grouping
// expressions and aggregate functions. We should resolve
// columns with `agg.output` and the rule `ResolveAggregateFunctions` will
// push them down to Aggregate later. This is similar to what we do in `resolveColumns`.
case u @ UnresolvedHaving(_, agg: Aggregate) =>
agg.resolveChildren(nameParts, conf.resolver)
.orElse(u.resolveChildren(nameParts, conf.resolver))
.map(wrapOuterReference)
case other =>
other.resolveChildren(nameParts, conf.resolver).map(wrapOuterReference)
}
}
// We use the first resolved expression here
// as the outerPlans are ordered by their depth and the
// first one is the closest to the subquery scope.
resolvedExpressions.headOption
} catch {
case ae: AnalysisException =>
logDebug(ae.getMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.SubExprUtils._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
Expand Down Expand Up @@ -111,9 +112,13 @@ object ValidateSubqueryExpression
case f: Filter =>
if (hasOuterReferences(expr.plan)) {
expr.plan.expressions.foreach(_.foreachUp {
case o: OuterReference =>
case o@OuterReference(a) =>
p.children.foreach(e =>
if (!e.output.exists(_.exprId == o.exprId)) {
if (!e.output.exists(_.exprId == o.exprId) &&
!expr.getOuterScopeAttrs.contains(a)) {
// If the outer reference is not found in the children plan,
// it should be a outer scope reference. Otherwise, it is
// invalid.
o.failAnalysis(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
"CORRELATED_COLUMN_NOT_FOUND",
Expand All @@ -125,11 +130,53 @@ object ValidateSubqueryExpression
case _ =>
}

def checkNestedOuterReferences(expr: SubqueryExpression): Unit = {
if (expr.getOuterScopeAttrs.nonEmpty) {
if (!SQLConf.get.getConf(SQLConf.SUPPORT_NESTED_CORRELATED_SUBQUERIES)) {
throw new AnalysisException(
errorClass = "NESTED_REFERENCES_IN_SUBQUERY_NOT_SUPPORTED",
messageParameters = Map(
"expression" -> expr.getOuterScopeAttrs.map(_.sql).mkString(","))
)
}
expr match {
case _: ScalarSubquery if
!SQLConf.get.getConf(
SQLConf.SUPPORT_NESTED_CORRELATED_SUBQUERIES_FOR_SCALARSUBQUERIES) =>
throw new AnalysisException(
errorClass = "NESTED_REFERENCES_IN_SUBQUERY_NOT_SUPPORTED",
messageParameters = Map(
"expression" -> expr.getOuterScopeAttrs.map(_.sql).mkString(","))
)
case _: ListQuery if
!SQLConf.get.getConf(
SQLConf.SUPPORT_NESTED_CORRELATED_SUBQUERIES_FOR_INSUBQUERIES) =>
throw new AnalysisException(
errorClass = "NESTED_REFERENCES_IN_SUBQUERY_NOT_SUPPORTED",
messageParameters = Map(
"expression" -> expr.getOuterScopeAttrs.map(_.sql).mkString(","))
)
case _: Exists if
!SQLConf.get.getConf(
SQLConf.SUPPORT_NESTED_CORRELATED_SUBQUERIES_FOR_EXISTSSUBQUERIES) =>
throw new AnalysisException(
errorClass = "NESTED_REFERENCES_IN_SUBQUERY_NOT_SUPPORTED",
messageParameters = Map(
"expression" -> expr.getOuterScopeAttrs.map(_.sql).mkString(","))
)
case _ => // Do nothing
}
}
}

// Check if there are nested correlated subqueries in the plan.
checkNestedOuterReferences(expr)

// Check if there is outer attribute that cannot be found from the plan.
checkOuterReference(plan, expr)

expr match {
case ScalarSubquery(query, outerAttrs, _, _, _, _, _) =>
case ScalarSubquery(query, outerAttrs, _, _, _, _, _, _) =>
// Scalar subquery must return one column as output.
if (query.output.size != 1) {
throw QueryCompilationErrors.subqueryReturnMoreThanOneColumn(query.output.size,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ case class SQLFunction(
case (None, Some(Project(expr :: Nil, _: OneRowRelation)))
if !isTableFunc =>
(Some(expr), None)
case (Some(ScalarSubquery(Project(expr :: Nil, _: OneRowRelation), _, _, _, _, _, _)), None)
if !isTableFunc =>
case (Some(ScalarSubquery(Project(expr :: Nil, _: OneRowRelation),
_, _, _, _, _, _, _)), None) if !isTableFunc =>
(Some(expr), None)
case (_, _) =>
(parsedExpression, parsedQuery)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.plans.logical.{HintInfo, LogicalPlan}
Expand All @@ -29,6 +30,8 @@ trait DynamicPruning extends Predicate
* The DynamicPruningSubquery expression is only used in join operations to prune one side of the
* join with a filter from the other side of the join. It is inserted in cases where partition
* pruning can be applied.
* The DynamicPruningSubquery expression should only have a single outer
* attribute which is the pruning key and should not have any outer scope attributes.
*
* @param pruningKey the filtering key of the plan to be pruned.
* @param buildQuery the build side of the join.
Expand All @@ -47,7 +50,7 @@ case class DynamicPruningSubquery(
onlyInBroadcast: Boolean,
exprId: ExprId = NamedExpression.newExprId,
hint: Option[HintInfo] = None)
extends SubqueryExpression(buildQuery, Seq(pruningKey), exprId, Seq.empty, hint)
extends SubqueryExpression(buildQuery, Seq(pruningKey), Seq.empty, exprId, Seq.empty, hint)
with DynamicPruning
with Unevaluable
with UnaryLike[Expression] {
Expand All @@ -67,6 +70,16 @@ case class DynamicPruningSubquery(
copy()
}

override def withNewOuterScopeAttrs(
outerScopeAttrs: Seq[Expression]
): DynamicPruningSubquery = {
if (outerScopeAttrs.nonEmpty) {
throw SparkException.internalError(
"DynamicPruningSubquery should not have outer scope attributes.")
}
this
}

override def withNewHint(hint: Option[HintInfo]): SubqueryExpression = copy(hint = hint)

override lazy val resolved: Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.plans.logical.{HintInfo, LogicalPlan, Project, Repartition, RepartitionByExpression, Sort}
import org.apache.spark.sql.catalyst.trees.TreePattern.{FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION, TreePattern}
import org.apache.spark.sql.catalyst.trees.TreePattern.{FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION, NESTED_CORRELATED_SUBQUERY, TreePattern}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.DataType

Expand Down Expand Up @@ -46,6 +46,10 @@ import org.apache.spark.sql.types.DataType
* relation or as a more complex logical plan in the event of a table subquery.
* @param outerAttrs outer references of this subquery plan, generally empty since these table
* arguments do not allow correlated references currently
* @param outerScopeAttrs outer references of the subquery plan that cannot be resolved by the
* direct containing query of the subquery. They have to be the subset of
* outerAttrs and are generally empty since these table arguments do not
* allow correlated references currently
* @param exprId expression ID of this subquery expression, generally generated afresh each time
* @param partitionByExpressions if non-empty, the TABLE argument included the PARTITION BY clause
* to indicate that the input relation should be repartitioned by the
Expand All @@ -67,30 +71,53 @@ import org.apache.spark.sql.types.DataType
case class FunctionTableSubqueryArgumentExpression(
plan: LogicalPlan,
outerAttrs: Seq[Expression] = Seq.empty,
outerScopeAttrs: Seq[Expression] = Seq.empty,
exprId: ExprId = NamedExpression.newExprId,
partitionByExpressions: Seq[Expression] = Seq.empty,
withSinglePartition: Boolean = false,
orderByExpressions: Seq[SortOrder] = Seq.empty,
selectedInputExpressions: Seq[PythonUDTFSelectedExpression] = Seq.empty)
extends SubqueryExpression(plan, outerAttrs, exprId, Seq.empty, None) with Unevaluable {
extends SubqueryExpression(
plan,
outerAttrs,
outerScopeAttrs,
exprId,
Seq.empty,
None
) with Unevaluable {

assert(!(withSinglePartition && partitionByExpressions.nonEmpty),
"WITH SINGLE PARTITION is mutually exclusive with PARTITION BY")

override def dataType: DataType = plan.schema

override def nullable: Boolean = false

override def withNewPlan(plan: LogicalPlan): FunctionTableSubqueryArgumentExpression =
copy(plan = plan)

override def withNewOuterAttrs(outerAttrs: Seq[Expression])
: FunctionTableSubqueryArgumentExpression = copy(outerAttrs = outerAttrs)

override def hint: Option[HintInfo] = None

override def withNewHint(hint: Option[HintInfo]): FunctionTableSubqueryArgumentExpression =
copy()

override def withNewOuterScopeAttrs(
newOuterScopeAttrs: Seq[Expression]
): FunctionTableSubqueryArgumentExpression = {
validateOuterScopeAttrs(newOuterScopeAttrs)
copy(outerScopeAttrs = newOuterScopeAttrs)
}

override def toString: String = s"table-argument#${exprId.id} $conditionString"

override lazy val canonicalized: Expression = {
FunctionTableSubqueryArgumentExpression(
plan.canonicalized,
outerAttrs.map(_.canonicalized),
outerScopeAttrs.map(_.canonicalized),
ExprId(0),
partitionByExpressions,
withSinglePartition,
Expand All @@ -101,8 +128,13 @@ case class FunctionTableSubqueryArgumentExpression(
newChildren: IndexedSeq[Expression]): FunctionTableSubqueryArgumentExpression =
copy(outerAttrs = newChildren)

final override def nodePatternsInternal(): Seq[TreePattern] =
Seq(FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION)
final override def nodePatternsInternal(): Seq[TreePattern] = {
if (outerScopeAttrs.isEmpty) {
Seq(FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION)
} else {
Seq(NESTED_CORRELATED_SUBQUERY, FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION)
}
}

def hasRepartitioning: Boolean = withSinglePartition || partitionByExpressions.nonEmpty

Expand Down
Loading