-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-51265][SQL][SS] Throw proper error for eagerlyExecuteCommands containing streaming source marker #50015
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,21 +30,23 @@ import org.apache.spark.internal.{Logging, MDC} | |
import org.apache.spark.internal.LogKeys.EXTENDED_EXPLAIN_GENERATOR | ||
import org.apache.spark.rdd.RDD | ||
import org.apache.spark.sql.{AnalysisException, ExtendedExplainGenerator, Row} | ||
import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker} | ||
import org.apache.spark.sql.catalyst.{ExtendedAnalysisException, InternalRow, QueryPlanningTracker} | ||
import org.apache.spark.sql.catalyst.analysis.{LazyExpression, UnsupportedOperationChecker} | ||
import org.apache.spark.sql.catalyst.expressions.codegen.ByteCodeStats | ||
import org.apache.spark.sql.catalyst.plans.QueryPlan | ||
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, Command, CommandResult, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, ReturnAnswer, Union} | ||
import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule} | ||
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 | ||
import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat | ||
import org.apache.spark.sql.catalyst.util.truncatedString | ||
import org.apache.spark.sql.classic.SparkSession | ||
import org.apache.spark.sql.execution.adaptive.{AdaptiveExecutionContext, InsertAdaptiveSparkPlan} | ||
import org.apache.spark.sql.execution.bucketing.{CoalesceBucketsInJoin, DisableUnnecessaryBucketedScan} | ||
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2ScanRelation | ||
import org.apache.spark.sql.execution.dynamicpruning.PlanDynamicPruningFilters | ||
import org.apache.spark.sql.execution.exchange.EnsureRequirements | ||
import org.apache.spark.sql.execution.reuse.ReuseExchangeAndSubquery | ||
import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata, WatermarkPropagator} | ||
import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata, StreamingExecutionRelation, StreamingRelation, WatermarkPropagator} | ||
import org.apache.spark.sql.internal.SQLConf | ||
import org.apache.spark.sql.streaming.OutputMode | ||
import org.apache.spark.util.{LazyTry, Utils} | ||
|
@@ -128,11 +130,39 @@ class QueryExecution( | |
case _ => "command" | ||
} | ||
|
||
private def assertNoStreamSourceMarkerNode(p: LogicalPlan): Unit = { | ||
// In UnsupportedOperationChecker.checkForBatch, we check if the plan has any streaming node. | ||
// That is more aggressive than just checking the marker node for streaming source which is | ||
// yet to be materialized. We'd like to be a bit conservative here since this is the exact | ||
// problematic case we figured out. | ||
p.foreach { | ||
case _: StreamingRelation | _: StreamingRelationV2 | | ||
_: StreamingExecutionRelation | _: StreamingDataSourceV2ScanRelation => | ||
val msg = "Queries with streaming sources must be executed with writeStream.start()" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does it have to be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, for now, every streaming query should be triggered via |
||
// This is exactly the same with UnsupportedOperationChecker.checkForBatch. | ||
// TODO: Classify and issue a new error class, along with UnsupportedOperationChecker. | ||
throw new ExtendedAnalysisException( | ||
new AnalysisException( | ||
errorClass = "_LEGACY_ERROR_TEMP_3102", | ||
messageParameters = Map("msg" -> msg)), | ||
plan = p) | ||
|
||
case _ => | ||
} | ||
} | ||
|
||
private def eagerlyExecuteCommands(p: LogicalPlan) = { | ||
def eagerlyExecute( | ||
p: LogicalPlan, | ||
name: String, | ||
mode: CommandExecutionMode.Value): LogicalPlan = { | ||
// Since we are about to execute the plan, the plan shouldn't have a marker node to be | ||
// materialized during microbatch planning. If the plan has a marker node, it is highly | ||
// likely that users put streaming sources in a batch query. | ||
// This case brings problem before reaching the check in UnsupportedOperationChecker, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the eager command execution is similar to a normal query execution, why can't it hit There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is actually coupled with "explain". The test in QueryExecutionSuite ends up with StackOverflowError. I'll leave a PR comment what happens in the test suite. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
// (assertSupported), so we need to verify here manually. | ||
assertNoStreamSourceMarkerNode(p) | ||
|
||
// Since Command execution will eagerly take place here, | ||
// and in most cases be the bulk of time and effort, | ||
// with the rest of processing of the root plan being just outputting command results, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -422,6 +422,24 @@ class QueryExecutionSuite extends SharedSparkSession { | |
mockCallback.assertAnalyzed() | ||
} | ||
|
||
test("SPARK-51265 Running eagerlyExecuteCommand with streaming source should give an user " + | ||
"facing error") { | ||
withTempView("s") { | ||
val streamDf = spark.readStream.format("rate").load() | ||
streamDf.createOrReplaceTempView("s") | ||
withTable("output") { | ||
val ex = intercept[AnalysisException] { | ||
// Creates a table from streaming source with batch query. This should fail. | ||
spark.sql("CREATE TABLE output AS SELECT * FROM s") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So when this query is executed, following is happening: CreateDataSourceTableAsSelectCommand is executed. This reaches assertSupported, but it's a leaf node and it hides the query, hence the assertion is no-op. It triggers InsertIntoHadoopFsRelationCommand. This exposes the query as child so we expect assertSupported is triggered, but the problem happens on creating "explainString" (planDesc). When the query is determined as streaming (any leaf node is string), Spark creates IncrementalExecution (since there are streaming specific rules being defined there) to create executed plan, which "disabled" assertSupported(). This is not a bug, because we shouldn't check the streaming query with batch query's criteria. It should have been checked with streaming query's criteria before. I'd say it is just conflicted - QueryExecution only works properly with batch query, and IncrementalExecution only works properly with streaming query. It's just that we found a case where QueryExecution somehow receives the execution of "streaming query" (at least from isStreaming flag perspective). What happens? withCachedData is called infinitely (haven't followed about why it made a loop) and ended up with StackOverflowError. This is only a case of CTAS, and there are lots of commands which we can't check everything, so I'd like to simply block the case where QueryExecution has to handle "streaming query" (which I only got reports from commands, but I could be wrong). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @cloud-fan FYI There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So before this PR, this test fails with stack overflow? If the stack overflow issue is fixed, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it went with stack overflow, and for this case we might be OK. Thuogh I wouldn't assume this is the only case - this is a minimized reproducer of the report, and origin report was ended up with AnalysisException with INTERNAL_ERROR (it was even from different command). |
||
} | ||
assert( | ||
ex.getMessage.contains("Queries with streaming sources must be executed with " + | ||
"writeStream.start()") | ||
) | ||
} | ||
} | ||
} | ||
|
||
case class MockCallbackEagerCommand( | ||
var trackerAnalyzed: QueryPlanningTracker = null, | ||
var trackerReadyForExecution: QueryPlanningTracker = null) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -215,6 +215,46 @@ class ForeachBatchSinkSuite extends StreamTest { | |
assert(ex.getCause == sparkEx) | ||
} | ||
|
||
test("SPARK-51265 Running eagerlyExecuteCommand with streaming source in foreachBatch " + | ||
"should give an user facing error") { | ||
val mem = MemoryStream[Int] | ||
val ds = mem.toDS().map(_ + 1) | ||
|
||
def foreachBatchFn(df: Dataset[Int], batchId: Long): Unit = { | ||
withTempView("param", "s") { | ||
df.createOrReplaceTempView("param") | ||
val streamDf = df.sparkSession.readStream.format("rate").load() | ||
streamDf.createOrReplaceTempView("s") | ||
withTable("output") { | ||
val ex = intercept[AnalysisException] { | ||
// Creates a table from streaming source with batch query. This should fail. | ||
df.sparkSession.sql("CREATE TABLE output AS SELECT * FROM s") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same reasoning as above, because queries in FEB user function are technically batch queries. |
||
} | ||
assert( | ||
ex.getMessage.contains("Queries with streaming sources must be executed with " + | ||
"writeStream.start()") | ||
) | ||
|
||
// Creates a table from batch source (materialized RDD plan of streaming query). | ||
// This should be work properly. | ||
df.sparkSession.sql("CREATE TABLE output AS SELECT * from param") | ||
|
||
checkAnswer( | ||
df.sparkSession.sql("SELECT value FROM output"), | ||
Seq(Row(2), Row(3), Row(4), Row(5), Row(6))) | ||
} | ||
} | ||
} | ||
|
||
mem.addData(1, 2, 3, 4, 5) | ||
|
||
val query = ds.writeStream | ||
.trigger(Trigger.AvailableNow()) | ||
.foreachBatch(foreachBatchFn _) | ||
.start() | ||
query.awaitTermination() | ||
} | ||
|
||
// ============== Helper classes and methods ================= | ||
|
||
private class ForeachBatchTester[T: Encoder](memoryStream: MemoryStream[Int]) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for leaf node command like
CreateDataSourceTableAsSelectCommand
, this check won't work?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah... LeafRunnableCommand doesn't seem to be something we could deal with it in general (as there is no general interface to look at underlying query). We could only deal with it when LeafRunnableCommand runs another query (which they should).