Skip to content

Commit b1a161a

Browse files
committed
Futures 0.3
1 parent 6f7792a commit b1a161a

File tree

2 files changed

+55
-60
lines changed

2 files changed

+55
-60
lines changed

Cargo.toml

+3-2
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ mio-evented = ["mio"]
1414
use_tokio = ["futures", "tokio", "mio-evented"]
1515

1616
[dependencies]
17-
futures = { version = "0.1", optional = true }
17+
futures = { version = "0.3", optional = true }
1818
nix = "0.14.0"
1919
mio = { version = "0.6", optional = true }
20-
tokio = { version = "0.1", optional = true }
20+
tokio = { version = "0.2", optional = true, features = ["io-driver"] }
21+
pin-utils = "0.1.0-alpha.4"

src/lib.rs

+52-58
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ extern crate futures;
4848
extern crate mio;
4949
extern crate nix;
5050
#[cfg(feature = "use_tokio")]
51+
extern crate pin_utils;
52+
#[cfg(feature = "use_tokio")]
5153
extern crate tokio;
5254

5355
use std::fs;
@@ -58,18 +60,21 @@ use std::io::prelude::*;
5860
use std::io::SeekFrom;
5961
use std::os::unix::prelude::*;
6062
use std::path::Path;
63+
use std::pin::{Pin as SPin};
6164

6265
#[cfg(feature = "use_tokio")]
63-
use futures::{Async, Poll, Stream};
66+
use futures::{Stream, task::{Context, Poll}};
6467
#[cfg(feature = "mio-evented")]
6568
use mio::unix::EventedFd;
6669
#[cfg(feature = "mio-evented")]
67-
use mio::Evented;
70+
use mio::{Evented, Ready};
6871
#[cfg(any(target_os = "linux", target_os = "android"))]
6972
use nix::sys::epoll::*;
7073
use nix::unistd::close;
7174
#[cfg(feature = "use_tokio")]
72-
use tokio::reactor::{Handle, PollEvented};
75+
use pin_utils::unsafe_pinned;
76+
#[cfg(feature = "use_tokio")]
77+
use tokio::{io::PollEvented, runtime::Handle};
7378

7479
pub use error::Error;
7580

@@ -466,17 +471,6 @@ impl Pin {
466471
AsyncPinPoller::new(self.pin_num)
467472
}
468473

469-
/// Get a Stream of pin interrupts for this pin
470-
///
471-
/// The PinStream object can be used with the `tokio` crate. You should probably call
472-
/// `set_edge()` before using this.
473-
///
474-
/// This method is only available when the `use_tokio` crate feature is enabled.
475-
#[cfg(feature = "use_tokio")]
476-
pub fn get_stream_with_handle(&self, handle: &Handle) -> Result<PinStream> {
477-
PinStream::init_with_handle(self.clone(), handle)
478-
}
479-
480474
/// Get a Stream of pin interrupts for this pin
481475
///
482476
/// The PinStream object can be used with the `tokio` crate. You should probably call
@@ -500,11 +494,8 @@ impl Pin {
500494
///
501495
/// This method is only available when the `use_tokio` crate feature is enabled.
502496
#[cfg(feature = "use_tokio")]
503-
pub fn get_value_stream_with_handle(&self, handle: &Handle) -> Result<PinValueStream> {
504-
Ok(PinValueStream(PinStream::init_with_handle(
505-
self.clone(),
506-
handle,
507-
)?))
497+
pub fn get_value_stream_with_handle(&self, _handle: &Handle) -> Result<PinValueStream> {
498+
Ok(PinValueStream::new(PinStream::init(self.clone())?))
508499
}
509500

510501
/// Get a Stream of pin values for this pin
@@ -520,7 +511,7 @@ impl Pin {
520511
/// This method is only available when the `use_tokio` crate feature is enabled.
521512
#[cfg(feature = "use_tokio")]
522513
pub fn get_value_stream(&self) -> Result<PinValueStream> {
523-
Ok(PinValueStream(PinStream::init(self.clone())?))
514+
Ok(PinValueStream::new(PinStream::init(self.clone())?))
524515
}
525516
}
526517

@@ -671,72 +662,75 @@ pub struct PinStream {
671662
skipped_first_event: bool,
672663
}
673664

674-
#[cfg(feature = "use_tokio")]
675-
impl PinStream {
676-
pub fn init_with_handle(pin: Pin, handle: &Handle) -> Result<Self> {
677-
Ok(PinStream {
678-
evented: PollEvented::new(pin.get_async_poller()?, &handle)?,
679-
skipped_first_event: false,
680-
})
681-
}
682-
}
683-
684665
#[cfg(feature = "use_tokio")]
685666
impl PinStream {
686667
pub fn init(pin: Pin) -> Result<Self> {
687668
Ok(PinStream {
688-
evented: PollEvented::new(pin.get_async_poller()?, &Handle::default())?,
669+
evented: PollEvented::new(pin.get_async_poller()?)?,
689670
skipped_first_event: false,
690671
})
691672
}
692673
}
693674

694675
#[cfg(feature = "use_tokio")]
695676
impl Stream for PinStream {
696-
type Item = ();
697-
type Error = Error;
698-
699-
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
700-
Ok(match self.evented.poll_read() {
701-
Async::Ready(()) => {
702-
self.evented.need_read();
703-
if self.skipped_first_event {
704-
Async::Ready(Some(()))
705-
} else {
706-
self.skipped_first_event = true;
707-
Async::NotReady
677+
type Item = Result<()>;
678+
679+
fn poll_next(mut self: SPin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
680+
match self.evented.poll_read_ready(cx, Ready::readable()) {
681+
Poll::Ready(res) => {
682+
match res {
683+
Ok(_) => {
684+
let _ = self.evented.clear_read_ready(cx, Ready::readable());
685+
if self.skipped_first_event {
686+
Poll::Ready(Some(Ok(())))
687+
} else {
688+
self.as_mut().skipped_first_event = true;
689+
Poll::Pending
690+
}
691+
}
692+
Err(e) => Poll::Ready(Some(Err(Error::Io(e))))
708693
}
709694
}
710-
Async::NotReady => Async::NotReady,
711-
})
695+
Poll::Pending => Poll::Pending,
696+
}
712697
}
713698
}
714699

715700
#[cfg(feature = "use_tokio")]
716-
pub struct PinValueStream(PinStream);
701+
pub struct PinValueStream { inner: PinStream }
717702

718703
#[cfg(feature = "use_tokio")]
719704
impl PinValueStream {
705+
unsafe_pinned!(inner: PinStream);
706+
707+
fn new(inner: PinStream) -> Self {
708+
PinValueStream { inner }
709+
}
710+
720711
#[inline]
721712
fn get_value(&mut self) -> Result<u8> {
722-
get_value_from_file(&mut self.0.evented.get_mut().devfile)
713+
get_value_from_file(&mut self.inner.evented.get_mut().devfile)
723714
}
724715
}
725716

726717
#[cfg(feature = "use_tokio")]
727718
impl Stream for PinValueStream {
728-
type Item = u8;
729-
type Error = Error;
730-
731-
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
732-
match self.0.poll() {
733-
Ok(Async::Ready(Some(()))) => {
734-
let value = try!(self.get_value());
735-
Ok(Async::Ready(Some(value)))
719+
type Item = Result<u8>;
720+
721+
fn poll_next(mut self: SPin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
722+
match self.as_mut().inner().poll_next(cx) {
723+
Poll::Ready(Some(res)) => {
724+
match res {
725+
Ok(_) => {
726+
let value = self.get_value();
727+
Poll::Ready(Some(value))
728+
}
729+
Err(e) => Poll::Ready(Some(Err(e)))
730+
}
736731
}
737-
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
738-
Ok(Async::NotReady) => Ok(Async::NotReady),
739-
Err(e) => Err(e),
732+
Poll::Ready(None) => Poll::Ready(None),
733+
Poll::Pending => Poll::Pending,
740734
}
741735
}
742736
}

0 commit comments

Comments
 (0)