-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Unify the error handling for the RecordBatchStream #12641
Comments
I agree that inconsistent behavior is not good -- my understanding is that when a stream is cancelled / aborted it should stop execution immediately: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#cancellation--aborting-execution so if you know of instances where that is not the case please let us know and I can file tickets to fix them However, that documentation does not describe the policy of "fast shutdown on error" -- I will try and clarify it with a new PR. I think many systems do want a quick shutdown if an error has occured and NOT continue to poll other streams. The reason is that if the overall query will error anyways, any additional polling is wasted work that would be thrown away.
The usecase of ignoring certain bad files and continuing reading other files makes sense to me for certain systems (though other systems would likely wish to abort immediately) To achieve this usecase, Instead of changing the default "shutdown as quickly as possible on error" behavior, here are some other ideas
|
I tried to clarify error behavior in #12651 |
Thanks for your review. The example I mentioned above might be too small. I will try to compare these two proposals from two perspectives. From the performance perspective, I agree with your proposal. Return From the functional perspective, I think my proposal is more flexible. As long as the operator supports failure recovery, the poller of the root stream could decide to continue polling the stream or not based on the concrete error. If the user just wants to abort the stream after receiving the error(like using the
Agree with it. But I think it is worth paying a negligible cost for more flexibility 😊 |
Indeed -- I agree this is more flexible. From my perspective the core design question is where the error recovery is implemented One design which I think you are proposing is to have the pollers (the consumers) make the decision to keep polling. In order to support both fail fast and error recovery modes I think we would have to
This would make it easier to implement execution plans that supported error recovery I think, but a tradeoff is it requires changes to the implementations to all ExecutionPlans (both in the core and user defined) to check / deal with error recovery An alternate design, which I hinted at, is to keep the error recovery within the ExecutionPlans that support it. In this mode, ExecutionPlans that can recover from errors do not return Err to their pollers. Instead they handle the error / recovery internally So in other words, I think you can support error recovery given how the current error handling is setup by implementing error recovery in the operator it is needed |
Yes, that's what I wanted to express.
Actually, we have already implemented it. The downside of this approach is that: The poller can not receive the error. Therefore, we have to analyze logs to verify whether there is an error with this query. It is not a simple matter, especially for the distributed execution. |
I wonder if it would be possible to change your datasource to
🤔 |
I am very pleased that we have come up with the same solution. We also have already implemented it 😂. The drawback of this approach is that: We have to combine multiple errors into one error, and then the poller needs to separate the combined error back to the original errors and handle these errors differently. Therefore, the error handling is implemented in two places: our customized operator and the poller. Currently, we are migrating our Finally, we came up with the proposed approach: let the pollers (the consumers) make the decision to keep polling. The centralized poller works like the executor in the push based execution model. It is more flexible and simple for the user. |
This is a neat idea -- do I understand you correctly that the idea is it is a wrapper around another |
Yes |
Is your feature request related to a problem or challenge?
Currently, most of the
RecordBatchStream
generated by thePhysicalPlan
could propagate the errors polled from the input and continue polling. However, some streams will stop polling the input stream when an error is encountered. The behavior is inconsistentDescribe the solution you'd like
Unify the error handling for all of the
RecordBatchStream
: Continue polling the input stream when the input stream produces the error and only the stream that encounters the error can stop itself.The advantage of this solution is that: We could decide whether to continue execution based on the type of error fetched from the root stream. For example, we may want to get the partial query result when data corruption happens in the
TableScan
. In this case, the error generated by theTableScan
will be passed through all of the streams. As long as thisTableScan
could recover it self to produce the nextRecordBatch
, we could get the error and the partial query resultDescribe alternatives you've considered
No response
Additional context
No response
The text was updated successfully, but these errors were encountered: