-
Notifications
You must be signed in to change notification settings - Fork 653
Consider having polling an error represent the final Stream value #206
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
@alexcrichton and I debated this initially, and the two options are equally expressive, in that a given stream can be set up to behave either way in either approach (e.g. by using I tend to think that it's much more common for stream errors to be "fatal" than recoverable, so I agree with @carllerche that we've probably chosen the wrong default here. |
I also think that this is largely a question of what the combinators do. For example the TCP listener probably won't fuse itself once an error happens but rather will continue to keep accepting sockets. The combinators, however, would be able to assume that when an error happens or Sounds reasonable to switch to errors == "end of stream" |
Terminating a stream on an error would simplify the life of implementers of the The main question seems to be indeed, what combinators would do in that case. |
I the world with streams terminating on an error, would there still be a place They could recover from one error, but what next? Their only option seems to be This seems to be a weak point of stopping on errors. Error recovery is |
Hm yeah that's a very good point about the combinators. I think this would remove basically all of the "chaining" combinators like
That... may slightly change my opinion here. @carllerche @aturon thoughts about the effect on these chaining combinators with streams? |
IMO this is an argument in favor of having the There are two different error classes with streams:
The current behavior of stream errors implies the first category which makes the second class of errors difficult to model. If, however, the stream error terminates the stream, this implies that a stream error follows under group #2.
This lets you differentiate between producing an error value, and an error in value production (you can have potentially two different error types). As pointed out, combinators like
For example would provide similar behavior as the current |
I would also favour the approach of termination of stream on errors. I find that an error which terminates the stream is much more common than one which is an "expected value" and is recoverable from. (FYI Rx also does this and it has error handling "operators" if you want to check how |
Hm I wonder if we'd perhaps have: fn and_then<F, R>(self, f: F) -> impl Stream<Result<R::Item, R::Error>, Self::Error>
where F: FnMut(Self::Item) -> R,
R: Future, That is, @carllerche I agree that if we had these semantics then a TCP listener would be that form of a stream. I find I'm still a little uneasy about the composition story though, it doesn't feel quite right... |
You could do something like this, which would have the additional benefit of being able to have different have different error types for the different category of errors: fn and_then<F, T, E, U, R>(self, f: F) -> impl Stream<Result<U, E>, Self::Error>
where Self: Stream<Item = Result<T, E>> + Sized,
F: FnMut(T) -> R,
R: IntoFuture<Item = Result<U, E>, Error = Self::Error>
{
// ...
} ... it is a lot of generics though... |
I think this PR is relevant to the discussion: https://github.com/alexcrichton/futures-rs/pull/199/files Aka, if an error is passed through without terminating the stream it is unclear if it should count as an element or not. |
If we go with this (which it seems like we will) we should update |
Shouldn't this be encoded by type? Currently Maybe we could have two traits (names invented just for now): enum StreamAsync<S: FallibleStream> {
Ready(S::Item, S),
NotReady(S),
}
trait FallibleStream {
type Item;
type Error;
fn poll(self) -> Result<StreamAsync<Self>, Self::Error>;
// combinators
}
trait InfallibleStream {
type Item;
// We don't need error here.
fn poll(&mut self) -> Async<Self::Item>;
}
// This bridges the two:
impl<S: FallibleStream> InfallibleStream for Result<S, S::Error> {
type Item = Result<S::Item, S::Error>;
fn poll(&mut self) -> Async<Self::Item> {
// Too lazy to write actual code, but the idea is to replace Self::Ok with Err if stream fails.
}
} I admit it'd somewhat complicate the API but it might express constrains more clearly and it'd be checked by compiler. What do you think? |
@Kixunil yes that was considered long ago in the initial early design stages of this library, but unfortunately it suffers a few problems.
|
Good points. What exactly do you mean by "With so many types always moving there's lots of variable binding and inability to store values in structures."? I somehow can't imagine, what do you mean.
Edit: Looking at the ASM of a simple example it seems like it doesn't... |
It basically means that every function consumes the receiver and then optionally returns it. That means that if you try to store partial state in a |
Thank you! With all this problems, I prefer to keep the semantics as is. Maybe provide some other interface to specify whether error is final. (e.g. |
@Kixunil could you explain why you would like the semantics to stay as is? If the semantics are changed, you can get the original semantics by having a stream over Result. |
@carllerche I think mutable reference expresses "the value will stay valid". If error indicated end of stream, what should happen when poll is called again? Panic? Return same error? Keeping it this way has another advantage of not breaking existing code. :) Also, one interesting example would be writing to file. The write can fail because of full disk but later it can succeed. On the other hand, I can imagine fatal error. I'd prefer if the semantics could be defined by implementor. Just single I'd love if the constraint could be encoded in type system somehow. To give an example, I dislike the way |
I've played with streams in other languages, and found the idea (in this library) of "errors that don't terminate the stream" odd. Not bad, just odd. I agree with the comments above that you could get the same effect with a stream of |
@robey we have these cases:
Same with Sink. How would you model all of these? |
@Kixunil I think @alexcrichton's idea from here would let all three cases work: #206 (comment) The first two cases would be the new behavior, and the third case would be handled by making the stream be explicitly of type |
@robey That sounds reasonable. The problem I see there is someone could accidentally mistake one for the other but that is probably unavoidable anyway. Anyway, I think my comment still holds: we should specify exactly, what should happen when someone calls |
After implementing my own transport for use with tokio-proto, I agree that having an error terminate the stream makes the most sense, and that as @robey said, a non-fatal error can be handled with Regarding @Kixunil's question about I understand the concern about the performance cost of the moves, but a very basic attempt seems to indicate that the necessary code changes wouldn't be that onerous. As an experiment I created a gist containing an implementation of |
@living180 Thank you for considering my suggestion! I'd like to revisit my suggestion again since I got into this very recently. I'm implementing an application where correctness > speed (but still it needs to be fast enough). This is related to fn poll(&mut self) -> Poll<...> {
match self.bus.take() {
Some(bus) => match bus.poll() {
Ok(Async::Ready(result)) => Ok(Async::Ready(MyNewFuture::new(result, bus)),
Ok(Async::NotReady) => {
self.bus = Some(bus);
Ok(Async::NotReady)
}
Err(e) => Err(e)
},
None => panic!("poll() called twice")
} This is weird, involves copying from/to |
@living180 note that using I do agree though that implementations would be easier with |
Above point is expressed around 13:20 in a C++ talk about observables. Judgment about error is done downstream, at the most opportune and clean place. Again, clean code theme is here. |
For the record, I have changed my mind since I opened this issue and I believe that the current behavior is the best. |
@carllerche why? However short you comment is, I cannot read your mind, and I'd love you to share an experience, that made you change your mind. |
There can be a middle ground here!
|
A stream returning a However when the resource underlying the stream returns an I believe I lean towards handling |
So to clarify, streams should be terminated (by the user), when the stream itself produces |
Consider the following life example from use of rx. Now 😄 the answer is not "read 3rd party code documentation". |
I thought about the form. How about the following formulation.
|
@3n-mb Very good question! I'm big believer in the type system and I believe we should have different types/traits for different behaviors. That way the compiler can check our code. |
Hi, I'm not sure if this is relevant to the discussion as I'm VERY new to Rust and futures streams. Still, I wanted to mention Reactive Streams in case somebody finds it useful. |
Sounds like Reactive Streams deal with non-blocking backpressure. How much tokio streams are pull-based, i.e. include backpressure control, versus observables (no inbuilt backpressure control)? |
@3n-mb I think this is off-topic. Streams are pull-based and sinks are push-based (but they still have backpresure). There's also unbounded channel, which has push-based non-blocking end. |
@3n-mb the linked overview is about java-streams, not reactive streams. BTW the reactive streams are now in Java 9 as java.util.concurrent.Flow. |
@luben I assume you talk about this link, and it compares the two:
@luben Yes! It is even in Java! |
@cramertj was there any particular decision here? |
This has been resolved. |
(the resolution was to not change the behaviour - a stream returning an error is not terminated and can yield further items or errors) |
Consider having polling an error represent the final value. In other words, a poll that returns an error means that
poll
should never be called again. In the case of a Stream, this means that an error indicates the stream has terminated.This issue is a placeholder for the associated discussion.
cc @aturon
The text was updated successfully, but these errors were encountered: