Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-51265][SQL][SS] Throw proper error for eagerlyExecuteCommands containing streaming source marker #50015

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,23 @@ import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys.EXTENDED_EXPLAIN_GENERATOR
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, ExtendedExplainGenerator, Row}
import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker}
import org.apache.spark.sql.catalyst.{ExtendedAnalysisException, InternalRow, QueryPlanningTracker}
import org.apache.spark.sql.catalyst.analysis.{LazyExpression, UnsupportedOperationChecker}
import org.apache.spark.sql.catalyst.expressions.codegen.ByteCodeStats
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, Command, CommandResult, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, ReturnAnswer, Union}
import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule}
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.classic.SparkSession
import org.apache.spark.sql.execution.adaptive.{AdaptiveExecutionContext, InsertAdaptiveSparkPlan}
import org.apache.spark.sql.execution.bucketing.{CoalesceBucketsInJoin, DisableUnnecessaryBucketedScan}
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2ScanRelation
import org.apache.spark.sql.execution.dynamicpruning.PlanDynamicPruningFilters
import org.apache.spark.sql.execution.exchange.EnsureRequirements
import org.apache.spark.sql.execution.reuse.ReuseExchangeAndSubquery
import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata, WatermarkPropagator}
import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata, StreamingExecutionRelation, StreamingRelation, WatermarkPropagator}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.util.{LazyTry, Utils}
Expand Down Expand Up @@ -128,11 +130,39 @@ class QueryExecution(
case _ => "command"
}

private def assertNoStreamSourceMarkerNode(p: LogicalPlan): Unit = {
// In UnsupportedOperationChecker.checkForBatch, we check if the plan has any streaming node.
// That is more aggressive than just checking the marker node for streaming source which is
// yet to be materialized. We'd like to be a bit conservative here since this is the exact
// problematic case we figured out.
p.foreach {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for leaf node command like CreateDataSourceTableAsSelectCommand, this check won't work?

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah... LeafRunnableCommand doesn't seem to be something we could deal with it in general (as there is no general interface to look at underlying query). We could only deal with it when LeafRunnableCommand runs another query (which they should).

case _: StreamingRelation | _: StreamingRelationV2 |
_: StreamingExecutionRelation | _: StreamingDataSourceV2ScanRelation =>
val msg = "Queries with streaming sources must be executed with writeStream.start()"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it have to be writeStream? Can it be readStream?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, readStream cannot start a streaming query.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, for now, every streaming query should be triggered via DataStreamWriter.start().

// This is exactly the same with UnsupportedOperationChecker.checkForBatch.
// TODO: Classify and issue a new error class, along with UnsupportedOperationChecker.
throw new ExtendedAnalysisException(
new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_3102",
messageParameters = Map("msg" -> msg)),
plan = p)

case _ =>
}
}

private def eagerlyExecuteCommands(p: LogicalPlan) = {
def eagerlyExecute(
p: LogicalPlan,
name: String,
mode: CommandExecutionMode.Value): LogicalPlan = {
// Since we are about to execute the plan, the plan shouldn't have a marker node to be
// materialized during microbatch planning. If the plan has a marker node, it is highly
// likely that users put streaming sources in a batch query.
// This case brings problem before reaching the check in UnsupportedOperationChecker,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the eager command execution is similar to a normal query execution, why can't it hit UnsupportedOperationChecker?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because assertSupported is not called for eagerly executed command?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually coupled with "explain". The test in QueryExecutionSuite ends up with StackOverflowError. I'll leave a PR comment what happens in the test suite.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// (assertSupported), so we need to verify here manually.
assertNoStreamSourceMarkerNode(p)

// Since Command execution will eagerly take place here,
// and in most cases be the bulk of time and effort,
// with the rest of processing of the root plan being just outputting command results,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,24 @@ class QueryExecutionSuite extends SharedSparkSession {
mockCallback.assertAnalyzed()
}

test("SPARK-51265 Running eagerlyExecuteCommand with streaming source should give an user " +
"facing error") {
withTempView("s") {
val streamDf = spark.readStream.format("rate").load()
streamDf.createOrReplaceTempView("s")
withTable("output") {
val ex = intercept[AnalysisException] {
// Creates a table from streaming source with batch query. This should fail.
spark.sql("CREATE TABLE output AS SELECT * FROM s")
Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So when this query is executed, following is happening:

CreateDataSourceTableAsSelectCommand is executed. This reaches assertSupported, but it's a leaf node and it hides the query, hence the assertion is no-op.

It triggers InsertIntoHadoopFsRelationCommand. This exposes the query as child so we expect assertSupported is triggered, but the problem happens on creating "explainString" (planDesc).

When the query is determined as streaming (any leaf node is string), Spark creates IncrementalExecution (since there are streaming specific rules being defined there) to create executed plan, which "disabled" assertSupported(). This is not a bug, because we shouldn't check the streaming query with batch query's criteria. It should have been checked with streaming query's criteria before.

I'd say it is just conflicted - QueryExecution only works properly with batch query, and IncrementalExecution only works properly with streaming query. It's just that we found a case where QueryExecution somehow receives the execution of "streaming query" (at least from isStreaming flag perspective).

What happens? withCachedData is called infinitely (haven't followed about why it made a loop) and ended up with StackOverflowError.

This is only a case of CTAS, and there are lots of commands which we can't check everything, so I'd like to simply block the case where QueryExecution has to handle "streaming query" (which I only got reports from commands, but I could be wrong).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So before this PR, this test fails with stack overflow?

If the stack overflow issue is fixed, InsertIntoHadoopFsRelationCommand will hit UnsupportedOperationChecker and we are fine?

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it went with stack overflow, and for this case we might be OK.

Thuogh I wouldn't assume this is the only case - this is a minimized reproducer of the report, and origin report was ended up with AnalysisException with INTERNAL_ERROR (it was even from different command).

}
assert(
ex.getMessage.contains("Queries with streaming sources must be executed with " +
"writeStream.start()")
)
}
}
}

case class MockCallbackEagerCommand(
var trackerAnalyzed: QueryPlanningTracker = null,
var trackerReadyForExecution: QueryPlanningTracker = null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,46 @@ class ForeachBatchSinkSuite extends StreamTest {
assert(ex.getCause == sparkEx)
}

test("SPARK-51265 Running eagerlyExecuteCommand with streaming source in foreachBatch " +
"should give an user facing error") {
val mem = MemoryStream[Int]
val ds = mem.toDS().map(_ + 1)

def foreachBatchFn(df: Dataset[Int], batchId: Long): Unit = {
withTempView("param", "s") {
df.createOrReplaceTempView("param")
val streamDf = df.sparkSession.readStream.format("rate").load()
streamDf.createOrReplaceTempView("s")
withTable("output") {
val ex = intercept[AnalysisException] {
// Creates a table from streaming source with batch query. This should fail.
df.sparkSession.sql("CREATE TABLE output AS SELECT * FROM s")
Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same reasoning as above, because queries in FEB user function are technically batch queries.

}
assert(
ex.getMessage.contains("Queries with streaming sources must be executed with " +
"writeStream.start()")
)

// Creates a table from batch source (materialized RDD plan of streaming query).
// This should be work properly.
df.sparkSession.sql("CREATE TABLE output AS SELECT * from param")

checkAnswer(
df.sparkSession.sql("SELECT value FROM output"),
Seq(Row(2), Row(3), Row(4), Row(5), Row(6)))
}
}
}

mem.addData(1, 2, 3, 4, 5)

val query = ds.writeStream
.trigger(Trigger.AvailableNow())
.foreachBatch(foreachBatchFn _)
.start()
query.awaitTermination()
}

// ============== Helper classes and methods =================

private class ForeachBatchTester[T: Encoder](memoryStream: MemoryStream[Int]) {
Expand Down