Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify the error handling for the RecordBatchStream #12641

Closed
YjyJeff opened this issue Sep 27, 2024 · 9 comments
Closed

Unify the error handling for the RecordBatchStream #12641

YjyJeff opened this issue Sep 27, 2024 · 9 comments
Labels
enhancement New feature or request

Comments

@YjyJeff
Copy link
Contributor

YjyJeff commented Sep 27, 2024

Is your feature request related to a problem or challenge?

Currently, most of the RecordBatchStream generated by the PhysicalPlan could propagate the errors polled from the input and continue polling. However, some streams will stop polling the input stream when an error is encountered. The behavior is inconsistent

Describe the solution you'd like

Unify the error handling for all of the RecordBatchStream: Continue polling the input stream when the input stream produces the error and only the stream that encounters the error can stop itself.

The advantage of this solution is that: We could decide whether to continue execution based on the type of error fetched from the root stream. For example, we may want to get the partial query result when data corruption happens in the TableScan. In this case, the error generated by the TableScan will be passed through all of the streams. As long as this TableScan could recover it self to produce the next RecordBatch, we could get the error and the partial query result

Describe alternatives you've considered

No response

Additional context

No response

@alamb
Copy link
Contributor

alamb commented Sep 27, 2024

I agree that inconsistent behavior is not good -- my understanding is that when a stream is cancelled / aborted it should stop execution immediately: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#cancellation--aborting-execution so if you know of instances where that is not the case please let us know and I can file tickets to fix them

However, that documentation does not describe the policy of "fast shutdown on error" -- I will try and clarify it with a new PR.

I think many systems do want a quick shutdown if an error has occured and NOT continue to poll other streams. The reason is that if the overall query will error anyways, any additional polling is wasted work that would be thrown away.

For example, we may want to get the partial query result when data corruption happens in the TableScan. In this case, the error generated by the TableScan will be passed through all of the streams. As long as this TableScan could recover it self to produce the next RecordBatch, we could get the error and the partial query result

The usecase of ignoring certain bad files and continuing reading other files makes sense to me for certain systems (though other systems would likely wish to abort immediately)

To achieve this usecase, Instead of changing the default "shutdown as quickly as possible on error" behavior, here are some other ideas

  1. Change the table scan operator itself (if you are using a custom one) so that it does not emit an error to the rest of the plan
  2. Add some new ExecutionPlan operator that will "ignore" errors (as in not pass whatever error it gets to the output)

@alamb
Copy link
Contributor

alamb commented Sep 27, 2024

I tried to clarify error behavior in #12651

@YjyJeff
Copy link
Contributor Author

YjyJeff commented Sep 29, 2024

I tried to clarify error behavior in #12651

Thanks for your review. The example I mentioned above might be too small. I will try to compare these two proposals from two perspectives.

From the performance perspective, I agree with your proposal. Return Poll(None) immediately from the parent stream could avoid the cost of polling the child stream. However, compared to returning the Poll(None) in the stream that encounters the error, I think the cost is negligible. We only need to pay the overhead of calling the poll function for the number of streams called.

From the functional perspective, I think my proposal is more flexible. As long as the operator supports failure recovery, the poller of the root stream could decide to continue polling the stream or not based on the concrete error. If the user just wants to abort the stream after receiving the error(like using the try_collect method), no extra overhead will be paid because the poll_next function will not be called after an error is received.

Most of the systems want a quick shutdown if an error has occured and NOT continue to poll other streams.

Agree with it. But I think it is worth paying a negligible cost for more flexibility 😊

@alamb
Copy link
Contributor

alamb commented Sep 29, 2024

From the functional perspective, I think my proposal is more flexible. As long as the operator supports failure recovery, the poller of the root stream could decide to continue polling the stream or not based on the concrete error. If the user just wants to abort the stream after receiving the error(like using the try_collect method), no extra overhead will be paid because the poll_next function will not be called after an error is received.

Indeed -- I agree this is more flexible. From my perspective the core design question is where the error recovery is implemented

One design which I think you are proposing is to have the pollers (the consumers) make the decision to keep polling. In order to support both fail fast and error recovery modes I think we would have to

  1. Update all pollers (as you propose to start in this PR)
  2. Add some way for the operators to report if they support failure recovery (eg. ExecutionPlan::error_recovery())

This would make it easier to implement execution plans that supported error recovery I think, but a tradeoff is it requires changes to the implementations to all ExecutionPlans (both in the core and user defined) to check / deal with error recovery

An alternate design, which I hinted at, is to keep the error recovery within the ExecutionPlans that support it. In this mode, ExecutionPlans that can recover from errors do not return Err to their pollers. Instead they handle the error / recovery internally

So in other words, I think you can support error recovery given how the current error handling is setup by implementing error recovery in the operator it is needed

@YjyJeff
Copy link
Contributor Author

YjyJeff commented Sep 30, 2024

One design which I think you are proposing is to have the pollers (the consumers) make the decision to keep polling

Yes, that's what I wanted to express.

So in other words, I think you can support error recovery given how the current error handling is setup by implementing error recovery in the operator it is needed

Actually, we have already implemented it. The downside of this approach is that: The poller can not receive the error. Therefore, we have to analyze logs to verify whether there is an error with this query. It is not a simple matter, especially for the distributed execution.

@alamb
Copy link
Contributor

alamb commented Sep 30, 2024

Actually, we have already implemented it. The downside of this approach is that: The poller can not receive the error. Therefore, we have to analyze logs to verify whether there is an error with this query. It is not a simple matter, especially for the distributed execution.

I wonder if it would be possible to change your datasource to

  1. "remember" that it saw an error, but don't return it immediately -- save it for later
  2. poll all the other data sources to completion
  3. Once all the inputs were done, then return the saved Error

🤔

@YjyJeff
Copy link
Contributor Author

YjyJeff commented Oct 8, 2024

Actually, we have already implemented it. The downside of this approach is that: The poller can not receive the error. Therefore, we have to analyze logs to verify whether there is an error with this query. It is not a simple matter, especially for the distributed execution.

I wonder if it would be possible to change your datasource to

  1. "remember" that it saw an error, but don't return it immediately -- save it for later
  2. poll all the other data sources to completion
  3. Once all the inputs were done, then return the saved Error

🤔

I am very pleased that we have come up with the same solution. We also have already implemented it 😂. The drawback of this approach is that: We have to combine multiple errors into one error, and then the poller needs to separate the combined error back to the original errors and handle these errors differently. Therefore, the error handling is implemented in two places: our customized operator and the poller.

Currently, we are migrating our TableScan to parquet. Therefore, we also need to implement this logic for it, which means that every user who wants to use customized TableScan should implement this approach in its own TableScan. It is pretty tedious. Therefore, we also need a CollectErrorThenEmitStream wrapper to avoid the repeated logic. Users of the datafusion may have to use this stream in many places. When a user wants to modify the error handling logic, like removing this stream, many scattered codes should be modified.

Finally, we came up with the proposed approach: let the pollers (the consumers) make the decision to keep polling. The centralized poller works like the executor in the push based execution model. It is more flexible and simple for the user.

@alamb
Copy link
Contributor

alamb commented Oct 9, 2024

CollectErrorThenEmitStream wrapper to avoid the repeated logic. Users of the datafusion may have to use this stream in many places. When a user wants to modify the error handling logic, like removing this stream, many scattered codes should be modified.

This is a neat idea -- do I understand you correctly that the idea is it is a wrapper around another RecordBatchStream that will save (not emit) errors and only return them at the end (after the wrapped stream is done?)

@YjyJeff
Copy link
Contributor Author

YjyJeff commented Oct 10, 2024

CollectErrorThenEmitStream wrapper to avoid the repeated logic. Users of the datafusion may have to use this stream in many places. When a user wants to modify the error handling logic, like removing this stream, many scattered codes should be modified.

This is a neat idea -- do I understand you correctly that the idea is it is a wrapper around another RecordBatchStream that will save (not emit) errors and only return them at the end (after the wrapped stream is done?)

Yes

@YjyJeff YjyJeff closed this as completed Oct 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants