Skip to content

Add futures::stream::unfold #209

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from Nov 10, 2016
Merged

Add futures::stream::unfold #209

merged 2 commits into from Nov 10, 2016

Conversation

ghost
Copy link

@ghost ghost commented Oct 15, 2016

This adds an adapter, a bit similar to Stream::fold, which allows to go from the "world of futures" to the "world of streams".

I felt such adapter was missing (or maybe I just missed it) when trying to use tokio-core, because most helper functions (like io::write_all or io::read_exact) return Futures and didn't work well with streaming protocols (using recursion with .and_then() finished by overflowing the stack).

This adapter allows to write code like:

fn handle_client<T: Read>(reader: T, handle: &Handle) -> Receiver<Vec<u8>> {
    let (tx,rx) = std::sync::mpsc::channel();
    let stream = repeat(reader, |reader| {
        // Read a length-prefixed message
        io::read_exact(reader, [0u8;8]).and_then(|(reader, buffer)| {
            let next_msg_size = buffer.read_u64::<LittleEndian>().unwrap() as usize;
            io::read_exact(reader, vec![0; next_msg_size])
        })
    });
    let future = stream.for_each(|message| {
        tx.send(message).unwrap();
    }).map_err(|e| println!("Error: {}", e));

    handle.spawn(future);
    rx
}

I am not too happy with the naming (repeat could mean something else) and with the example in the documentation, so I will be happy to change that.

We could also try to have a way to get back ownership of the internal state (right now it is possible by integrating it the Error type for example, but it looks ugly)

@alexcrichton
Copy link
Member

Thanks for the PR! I wonder if we could perhaps think of a different name for this? It seems similar to std::iter::repeat but prety different as well? Right now it looks similar-ish in functionality at least to Iterator::scan

@ghost
Copy link
Author

ghost commented Oct 30, 2016

My apologies for the delay, I just realized that I actually forgot to send my reply to you ...

I agree the name isn't great because it is close to other functions with different semantics.
I tried to see if I could find a similar functionality in other programming languages for inspiration, but I didn't find anything.

In essence, this functions creates a Stream out of thin air by repeating the same action (future) and waiting for it to complete, hence my first name idea repeat. I had also thought of generate_stream, but I didn't find it good either. I will keep thinking about names, but if you have any suggestions please let me know.

Apart from the name, do you think the functionality offered is useful (and has a place in the futures crate)?

@alexcrichton
Copy link
Member

Oh no worries!

This feels similar to an infinite stream with a map or perhaps something that's just FnMut() -> Option<Future<T>>. Could you elaborate on why T is passed in explicitly? I'm also still a little undecided on the name, but I think we'd want to reserve the name repeat at the very least.

I'm also totally fine adding this to this crate, seems like quite appropriate functionality to me!

@ghost
Copy link
Author

ghost commented Oct 31, 2016

The idea was to be able to pass ownership of a (potentially unclonable) object (such as a socket). The user provides the initial T, that we give to the closure, which returns a Future. The returned Future must resolve in a tuple (T, Item). T is then fed back into the closure, and Item is yielded by the Stream.

I took this approach because it fits well with some tokio functions, like tokio::io::read_exact<A, T>(a: A, buf: T) which returns a future that will resolve to (a, buf), thus giving back ownership of the socket/buffer.

I think a major disadvantage is that it may result in another "level" of tuple, already commonly found in tokio. For example, if the user wants the stream to yield (A, B) tuples, the Future created by the closure will need to return a (T, (A, B)).

As you pointed out, another design could be not to pass that state around (and have FnMut() -> Option<Future<T>>), but this may force users to use Rc<RefCell<T>> or TaskRc<RefCell<T>> in places where it may not be necessary, going a bit further from zero-cost abstractions.

@alexcrichton
Copy link
Member

Hm the state here is a little different than read_exact though where you never get it back out. That is, the internal state is locked into the stream forever (not yielded as an item). In that sense it could just be captured and closed over by the FnMut, right?

That may require an Option in some places to deal with take and unwrap, but that's what this implementation is doing anyway. I don't think it'll require Rc and/or internal mutability.

@ghost
Copy link
Author

ghost commented Nov 2, 2016

But if the state was closed over by the FnMut (as an Option as you suggest) how would the Future returned by the FnMut put it back into the closure's environment?

I figured I would end up having something like (pseudocode):

struct Repeat {
    f: F,
    future: Future,  // Future contains a reference to a part of f environment
}

Which would be rejected by Rust (can't have self-referencing structures). This is why I made the future explicitely return back the ownership of T.

@alexcrichton
Copy link
Member

Aha! I see what you mean now, that is indeed quite subtle. I wonder if perhaps this is almost more worth it as a combinator rather than a base construction of a stream. That is, an and_then like combinator which plumbs state through the various futures and such. What do you think?

@ghost
Copy link
Author

ghost commented Nov 3, 2016

Yes, I guess that should work! I could then have the functionality I originally wanted with something like futures::stream::iter(std::iter::repeat(())).and_then_state(initial_state, |state, _| { ... });

I'll try to work on the implementation tonight (and try to find a good name, although I am very bad at naming).

As a side note, wouldn't your previous comment about the state possibly being captured in the closure's environment apply to the std::iter::scan() iterator as well?

@alexcrichton
Copy link
Member

I'll try to work on the implementation tonight (and try to find a good name, although I am very bad at naming).

Ok! We can figure out naming afterwards, that's fine.

As a side note, wouldn't your previous comment about the state possibly being captured in the closure's environment apply to the std::iter::scan() iterator as well?

Indeed! I was never a fan of scan as an iterator...

@ghost
Copy link
Author

ghost commented Nov 5, 2016

OK the implementation is mostly done and revealed an interesting edge-case that I overlooked before. In case of error returned by the user-returned future, we would lose the internal state with no way to get it back (the future should have returned it in the Ok() variant). So I chose the option of returning Ok(Async::Ready(None)) because it is the only sane thing to do. This edge-case would disappear if polling an error meant the end of the stream, as suggested in #206.

I also added a few tests.

The Travis failure looks unrelated to this PR.

@alexcrichton
Copy link
Member

@aturon you wouldn't happen to recognize this combinator would you? I can't quite put my finger on it but it seems like something we've seen before...

@aturon
Copy link
Member

aturon commented Nov 8, 2016

@alexcrichton This combinator is sometimes known as "unfold", where you take a seed and a transformation, and produce from it a stream of values.

@alexcrichton
Copy link
Member

Oh that sounds pretty nifty! @Vaelden want to add this as Stream::unfold and rename appropriately?

@ghost
Copy link
Author

ghost commented Nov 10, 2016

@aturon I would just like to double-check, because since I started this PR I changed the implementation but not the accompanying docs (which I will do ASAP): is "unfold" an adapter, taking a stream of values, a seed and a transformation and creating a new stream; or does it produce a stream from nothing (well ... seed + transformation)?

@aturon
Copy link
Member

aturon commented Nov 10, 2016

@Vaelden It produces a stream from nothing, just as you have it here. Here's a similar function in Haskell. You can think of it as the "reverse" of fold.

@alexcrichton
Copy link
Member

Ah ok so given that, @Vaelden I wonder if it might be best to go back to what you had originally, not as a combinator on the Stream trait? Sorry for the runaround!

@ghost ghost changed the title Add futures::stream::repeat Add futures::stream::unfold Nov 10, 2016
@ghost
Copy link
Author

ghost commented Nov 10, 2016

@aturon Thanks a lot for the link! It helped me a lot for the documentation.

@alexcrichton That's fine no worries! I changed the name, rewrote the doc and rebased on top of master so everything should be fine now (except the name of my base branch that I cannot change from repeat_adapter without creating another PR).

@ghost
Copy link
Author

ghost commented Nov 10, 2016

Oh and I also changed the order of the item in the tuple to match Haskell's implementation: the Future must return (item, next_state) instead of (next_state, item) as in the beginning of this PR.

@alexcrichton alexcrichton merged commit 352d91c into rust-lang:master Nov 10, 2016
@alexcrichton
Copy link
Member

Thanks @Vaelden!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants