Skip to content

Commit 9c52450

Browse files
committed
Lift the error type out of Poll into a Result
That is: type Poll<T, E> = Result<Async<T>, E> enum Async<T> { Ready(T), NotReady, } This commit tweaks the return type of the `Future::poll` method to be more `try!`-friendly and also more amenable to `if let` for matching against readiness. Additionally, `Async` is used instead of `Option` to be more clear about what's actually happening, and it's also theoretically more extensible in the future if need be. Finally, the `try_poll!` macro has been removed while a new `try_ready!` macro has been added. This new macro has *different semantics* as it propagates both `NotReady` *and* errors to the caller. It only returns the `Async::Ready` variant. Closes #108
1 parent f3c5a66 commit 9c52450

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+417
-418
lines changed

futures-cpupool/src/lib.rs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
4848
use std::thread;
4949

5050
use crossbeam::sync::MsQueue;
51-
use futures::{IntoFuture, Future, oneshot, Oneshot, Complete, Poll};
51+
use futures::{IntoFuture, Future, oneshot, Oneshot, Complete, Poll, Async};
5252
use futures::task::{self, Run, Executor};
5353

5454
/// A thread pool intended to run CPU intensive work.
@@ -223,11 +223,11 @@ impl<T: Send + 'static, E: Send + 'static> Future for CpuFuture<T, E> {
223223
type Error = E;
224224

225225
fn poll(&mut self) -> Poll<T, E> {
226-
match self.inner.poll() {
227-
Poll::Ok(Ok(res)) => res.into(),
228-
Poll::Ok(Err(e)) => panic::resume_unwind(e),
229-
Poll::Err(_) => panic!("shouldn't be canceled"),
230-
Poll::NotReady => Poll::NotReady,
226+
match self.inner.poll().expect("shouldn't be canceled") {
227+
Async::Ready(Ok(Ok(e))) => Ok(e.into()),
228+
Async::Ready(Ok(Err(e))) => Err(e),
229+
Async::Ready(Err(e)) => panic::resume_unwind(e),
230+
Async::NotReady => Ok(Async::NotReady),
231231
}
232232
}
233233
}
@@ -237,13 +237,17 @@ impl<F: Future> Future for Sender<F, Result<F::Item, F::Error>> {
237237
type Error = ();
238238

239239
fn poll(&mut self) -> Poll<(), ()> {
240-
if let Poll::Ok(_) = self.tx.as_mut().unwrap().poll_cancel() {
240+
if let Ok(Async::Ready(_)) = self.tx.as_mut().unwrap().poll_cancel() {
241241
// Cancelled, bail out
242-
return Poll::Ok(());
242+
return Ok(().into())
243243
}
244244

245-
let res = try_poll!(self.fut.poll());
245+
let res = match self.fut.poll() {
246+
Ok(Async::Ready(e)) => Ok(e),
247+
Ok(Async::NotReady) => return Ok(Async::NotReady),
248+
Err(e) => Err(e),
249+
};
246250
self.tx.take().unwrap().complete(res);
247-
Poll::Ok(())
251+
Ok(Async::Ready(()))
248252
}
249253
}

src/catch_unwind.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
use {Future, Poll};
2-
31
use std::prelude::v1::*;
42
use std::any::Any;
53
use std::panic::{catch_unwind, UnwindSafe, AssertUnwindSafe};
64

5+
use {Future, Poll, Async};
6+
77
/// Future for the `catch_unwind` combinator.
88
///
99
/// This is created by this `Future::catch_unwind` method.
@@ -27,14 +27,14 @@ impl<F> Future for CatchUnwind<F>
2727

2828
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
2929
let mut future = self.future.take().expect("cannot poll twice");
30-
match catch_unwind(|| (future.poll(), future)) {
31-
Ok((Poll::NotReady, f)) => {
32-
self.future = Some(f);
33-
Poll::NotReady
30+
let (res, future) = try!(catch_unwind(|| (future.poll(), future)));
31+
match res {
32+
Ok(Async::NotReady) => {
33+
self.future = Some(future);
34+
Ok(Async::NotReady)
3435
}
35-
Ok((Poll::Ok(v), _)) => Poll::Ok(Ok(v)),
36-
Ok((Poll::Err(e), _)) => Poll::Ok(Err(e)),
37-
Err(e) => Poll::Err(e),
36+
Ok(Async::Ready(t)) => Ok(Async::Ready(Ok(t))),
37+
Err(e) => Ok(Async::Ready(Err(e))),
3838
}
3939
}
4040
}

src/chain.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use core::mem;
22

3-
use {Future, Poll};
3+
use {Future, Poll, Async};
44

55
pub enum Chain<A, B, C> where A: Future {
66
First(A, C),
@@ -21,22 +21,27 @@ impl<A, B, C> Chain<A, B, C>
2121
-> Result<Result<B::Item, B>, B::Error>,
2222
{
2323
let a_result = match *self {
24-
Chain::First(ref mut a, _) => try_poll!(a.poll()),
24+
Chain::First(ref mut a, _) => {
25+
match a.poll() {
26+
Ok(Async::NotReady) => return Ok(Async::NotReady),
27+
Ok(Async::Ready(t)) => Ok(t),
28+
Err(e) => Err(e),
29+
}
30+
}
2531
Chain::Second(ref mut b) => return b.poll(),
2632
Chain::Done => panic!("cannot poll a chained future twice"),
2733
};
2834
let data = match mem::replace(self, Chain::Done) {
2935
Chain::First(_, c) => c,
3036
_ => panic!(),
3137
};
32-
match f(a_result, data) {
33-
Ok(Ok(e)) => Poll::Ok(e),
34-
Ok(Err(mut b)) => {
38+
match try!(f(a_result, data)) {
39+
Ok(e) => Ok(Async::Ready(e)),
40+
Err(mut b) => {
3541
let ret = b.poll();
3642
*self = Chain::Second(b);
3743
ret
3844
}
39-
Err(e) => Poll::Err(e),
4045
}
4146
}
4247
}

src/collect.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::prelude::v1::*;
22

33
use std::mem;
44

5-
use {Future, IntoFuture, Poll};
5+
use {Future, IntoFuture, Poll, Async};
66

77
/// A future which takes a list of futures and resolves with a vector of the
88
/// completed values.
@@ -77,8 +77,9 @@ impl<I> Future for Collect<I>
7777
loop {
7878
match self.cur {
7979
Some(ref mut cur) => {
80-
match try_poll!(cur.poll()) {
81-
Ok(e) => self.result.push(e),
80+
match cur.poll() {
81+
Ok(Async::Ready(e)) => self.result.push(e),
82+
Ok(Async::NotReady) => return Ok(Async::NotReady),
8283

8384
// If we hit an error, drop all our associated resources
8485
// ASAP.
@@ -89,12 +90,12 @@ impl<I> Future for Collect<I>
8990
for f in self.result.drain(..) {
9091
drop(f);
9192
}
92-
return Poll::Err(e)
93+
return Err(e)
9394
}
9495
}
9596
}
9697
None => {
97-
return Poll::Ok(mem::replace(&mut self.result, Vec::new()))
98+
return Ok(Async::Ready(mem::replace(&mut self.result, Vec::new())))
9899
}
99100
}
100101

src/done.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use {Future, Poll};
1+
use {Future, Poll, Async};
22

33
/// A future representing a value that is immediately ready.
44
///
@@ -30,6 +30,6 @@ impl<T, E> Future for Done<T, E> {
3030
type Error = E;
3131

3232
fn poll(&mut self) -> Poll<T, E> {
33-
self.inner.take().expect("cannot poll Done twice").into()
33+
self.inner.take().expect("cannot poll Done twice").map(Async::Ready)
3434
}
3535
}

src/empty.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use core::marker;
22

3-
use {Future, Poll};
3+
use {Future, Poll, Async};
44

55
/// A future which is never resolved.
66
///
@@ -12,7 +12,7 @@ pub struct Empty<T, E> {
1212
/// Creates a future which never resolves, representing a computation that never
1313
/// finishes.
1414
///
15-
/// The returned future will forever return `Poll::NotReady`.
15+
/// The returned future will forever return `Async::NotReady`.
1616
pub fn empty<T, E>() -> Empty<T, E> {
1717
Empty { _data: marker::PhantomData }
1818
}
@@ -22,6 +22,6 @@ impl<T, E> Future for Empty<T, E> {
2222
type Error = E;
2323

2424
fn poll(&mut self) -> Poll<T, E> {
25-
Poll::NotReady
25+
Ok(Async::NotReady)
2626
}
2727
}

src/failed.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,6 @@ impl<T, E> Future for Failed<T, E> {
3131
type Error = E;
3232

3333
fn poll(&mut self) -> Poll<T, E> {
34-
Poll::Err(self.e.take().expect("cannot poll Failed twice"))
34+
Err(self.e.take().expect("cannot poll Failed twice"))
3535
}
3636
}

src/finished.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use core::marker;
22

3-
use {Future, Poll};
3+
use {Future, Poll, Async};
44

55
/// A future representing a finished successful computation.
66
///
@@ -33,6 +33,6 @@ impl<T, E> Future for Finished<T, E> {
3333

3434

3535
fn poll(&mut self) -> Poll<T, E> {
36-
Poll::Ok(self.t.take().expect("cannot poll Finished twice"))
36+
Ok(Async::Ready(self.t.take().expect("cannot poll Finished twice")))
3737
}
3838
}

src/fuse.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
use {Future, Poll};
1+
use {Future, Poll, Async};
22

33
/// A future which "fuse"s a future once it's been resolved.
44
///
55
/// Normally futures can behave unpredictable once they're used after a future
6-
/// has been resolved, but `Fuse` is always defined to return `Poll::NotReady`
6+
/// has been resolved, but `Fuse` is always defined to return `Async::NotReady`
77
/// from `poll` after it has succeeded, and after it has succeeded all future
88
/// calls to `schedule` will be ignored.
99
pub struct Fuse<A: Future> {
@@ -21,10 +21,14 @@ impl<A: Future> Future for Fuse<A> {
2121
type Error = A::Error;
2222

2323
fn poll(&mut self) -> Poll<A::Item, A::Error> {
24-
let ret = self.future.as_mut().map(|f| f.poll());
25-
if ret.as_ref().map(|r| r.is_ready()) == Some(true) {
26-
self.future = None;
24+
let res = self.future.as_mut().map(|f| f.poll());
25+
match res.unwrap_or(Ok(Async::NotReady)) {
26+
res @ Ok(Async::Ready(_)) |
27+
res @ Err(_) => {
28+
self.future = None;
29+
return res
30+
}
31+
Ok(Async::NotReady) => Ok(Async::NotReady)
2732
}
28-
ret.unwrap_or(Poll::NotReady)
2933
}
3034
}

src/join.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
use core::mem;
44

5-
use {Future, Poll, IntoFuture};
5+
use {Future, Poll, IntoFuture, Async};
66

77
macro_rules! generate {
88
($(
@@ -50,23 +50,23 @@ macro_rules! generate {
5050
Ok(done) => done,
5151
Err(e) => {
5252
self.erase();
53-
return Poll::Err(e)
53+
return Err(e)
5454
}
5555
};
5656
$(
5757
all_done = match self.$B.poll() {
5858
Ok(done) => all_done && done,
5959
Err(e) => {
6060
self.erase();
61-
return Poll::Err(e)
61+
return Err(e)
6262
}
6363
};
6464
)*
6565

6666
if all_done {
67-
Poll::Ok((self.a.take(), $(self.$B.take()),*))
67+
Ok(Async::Ready((self.a.take(), $(self.$B.take()),*)))
6868
} else {
69-
Poll::NotReady
69+
Ok(Async::NotReady)
7070
}
7171
}
7272
}
@@ -131,17 +131,16 @@ enum MaybeDone<A: Future> {
131131
impl<A: Future> MaybeDone<A> {
132132
fn poll(&mut self) -> Result<bool, A::Error> {
133133
let res = match *self {
134-
MaybeDone::NotYet(ref mut a) => a.poll(),
134+
MaybeDone::NotYet(ref mut a) => try!(a.poll()),
135135
MaybeDone::Done(_) => return Ok(true),
136136
MaybeDone::Gone => panic!("cannot poll Join twice"),
137137
};
138138
match res {
139-
Poll::Ok(res) => {
139+
Async::Ready(res) => {
140140
*self = MaybeDone::Done(res);
141141
Ok(true)
142142
}
143-
Poll::Err(res) => Err(res),
144-
Poll::NotReady => Ok(false),
143+
Async::NotReady => Ok(false),
145144
}
146145
}
147146

0 commit comments

Comments
 (0)