Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-51265][SQL][SS] Throw proper error for eagerlyExecuteCommands containing streaming source marker #50015

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

HeartSaVioR
Copy link
Contributor

What changes were proposed in this pull request?

This PR checks whether the logical plan contains streaming source marker when eagerlyExecutedCommands are about to be executed. Here, the meaning of streaming source marker is a placeholder which will be materialized during microbatch planning. That means, if the plan has such a marker, the source is not materialized hence unable to read from that source.

This is easily triggered when user constructs the plan for the command (e.g. df.write.saveToTable), which includes df.readStream, or indirect reference (temp view against df.readStream).

This has to be caught by UnsupportedOperationChecker.checkBatch (which is called from QueryExecution.assertSupported), but if the query is a command which is meant to be eagerly executed, it throws an error before reaching to the code path, and the error is cryptic (either StackOverflowError, or AnalysisException but InternalError).

We should provide the proper error message to tell user that they have to fix their query.

Why are the changes needed?

Without the fix, StackOverflowError, or AnalysisException but InternalError is thrown for user's fault query.

Does this PR introduce any user-facing change?

Yes, we will provide clearer error (though TODO to be clarified for error class) for the error.

How was this patch tested?

New UT.

Was this patch authored or co-authored using generative AI tooling?

No.

@HeartSaVioR
Copy link
Contributor Author

cc. @cloud-fan @viirya Would you mind taking a look? Thanks!

p.foreach {
case _: StreamingRelation | _: StreamingRelationV2 |
_: StreamingExecutionRelation | _: StreamingDataSourceV2ScanRelation =>
val msg = "Queries with streaming sources must be executed with writeStream.start()"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it have to be writeStream? Can it be readStream?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, readStream cannot start a streaming query.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, for now, every streaming query should be triggered via DataStreamWriter.start().

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like a more correct error for users.

// Since we are about to execute the plan, the plan shouldn't have a marker node to be
// materialized during microbatch planning. If the plan has a marker node, it is highly
// likely that users put streaming sources in a batch query.
// This case brings problem before reaching the check in UnsupportedOperationChecker,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the eager command execution is similar to a normal query execution, why can't it hit UnsupportedOperationChecker?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because assertSupported is not called for eagerly executed command?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually coupled with "explain". The test in QueryExecutionSuite ends up with StackOverflowError. I'll leave a PR comment what happens in the test suite.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

withTable("output") {
val ex = intercept[AnalysisException] {
// Creates a table from streaming source with batch query. This should fail.
spark.sql("CREATE TABLE output AS SELECT * FROM s")
Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So when this query is executed, following is happening:

CreateDataSourceTableAsSelectCommand is executed. This reaches assertSupported, but it's a leaf node and it hides the query, hence the assertion is no-op.

It triggers InsertIntoHadoopFsRelationCommand. This exposes the query as child so we expect assertSupported is triggered, but the problem happens on creating "explainString" (planDesc).

When the query is determined as streaming (any leaf node is string), Spark creates IncrementalExecution (since there are streaming specific rules being defined there) to create executed plan, which "disabled" assertSupported(). This is not a bug, because we shouldn't check the streaming query with batch query's criteria. It should have been checked with streaming query's criteria before.

I'd say it is just conflicted - QueryExecution only works properly with batch query, and IncrementalExecution only works properly with streaming query. It's just that we found a case where QueryExecution somehow receives the execution of "streaming query" (at least from isStreaming flag perspective).

What happens? withCachedData is called infinitely (haven't followed about why it made a loop) and ended up with StackOverflowError.

This is only a case of CTAS, and there are lots of commands which we can't check everything, so I'd like to simply block the case where QueryExecution has to handle "streaming query" (which I only got reports from commands, but I could be wrong).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So before this PR, this test fails with stack overflow?

If the stack overflow issue is fixed, InsertIntoHadoopFsRelationCommand will hit UnsupportedOperationChecker and we are fine?

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it went with stack overflow, and for this case we might be OK.

Thuogh I wouldn't assume this is the only case - this is a minimized reproducer of the report, and origin report was ended up with AnalysisException with INTERNAL_ERROR (it was even from different command).

withTable("output") {
val ex = intercept[AnalysisException] {
// Creates a table from streaming source with batch query. This should fail.
df.sparkSession.sql("CREATE TABLE output AS SELECT * FROM s")
Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same reasoning as above, because queries in FEB user function are technically batch queries.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Feb 20, 2025

I realized the downside of the fix - if we were ever planning to unify the logical plan for write between batch and streaming (at least DSv2 path), it wouldn't work. For now, streaming write node is always either WriteToMicroBatchDataSource or WriteToMicroBatchDataSourceV1, so wouldn't be problematic with this change.

Though I don't have a clear way to resolve the above I commented, so unless someone has brilliant idea to resolve the coupling, maybe we need to live with it.

// That is more aggressive than just checking the marker node for streaming source which is
// yet to be materialized. We'd like to be a bit conservative here since this is the exact
// problematic case we figured out.
p.foreach {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for leaf node command like CreateDataSourceTableAsSelectCommand, this check won't work?

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah... LeafRunnableCommand doesn't seem to be something we could deal with it in general (as there is no general interface to look at underlying query). We could only deal with it when LeafRunnableCommand runs another query (which they should).

@viirya
Copy link
Member

viirya commented Feb 20, 2025

I realized the downside of the fix - if we were ever planning to unify the logical plan for write between batch and streaming (at least DSv2 path), it wouldn't work. For now, streaming write node is always either WriteToMicroBatchDataSource or WriteToMicroBatchDataSourceV1, so wouldn't be problematic with this change.

What does it mean? I saw you don't check WriteToMicroBatchDataSource or WriteToMicroBatchDataSourceV1 but streaming relations like StreamingRelation, etc.

@HeartSaVioR
Copy link
Contributor Author

@viirya
We have a TODO comment to apply DSv2 write to streaming query as well. e.g. df.writeTo(tblName).append() uses AppendData node. Not sure whether we will address this in near future though (this TODO comment seems to be there for years).

@viirya
Copy link
Member

viirya commented Feb 20, 2025

@viirya We have a TODO comment to apply DSv2 write to streaming query as well. e.g. df.writeTo(tblName).append() uses AppendData node. Not sure whether we will address this in near future though (this TODO comment seems to be there for years).

Do you mean after that (apply DSv2 write to streaming query), the fix won't work anymore?

@HeartSaVioR
Copy link
Contributor Author

Depends on how we address it - here, we assert when executing command, and AppendData is AFAIK a command. So if we just make V2Command work for streaming, valid streaming queries will be routed to the assertion path and we will incorrectly fail the query.

Though I think we may need a bigger change to address TODO comment, so probably yet to be worried.

@viirya
Copy link
Member

viirya commented Feb 20, 2025

I see. Yea, maybe it is too early to worry about it.

@cloud-fan
Copy link
Contributor

I think the root cause is command execution mode set incorrectly in IncrementalExecution, see #50037

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants