Skip to content

Commit 4713a02

Browse files
bors[bot]oll3posborne
authored
Merge #69
69: Bump async dependencies r=eldruin a=oll3 Bumped tokio, mio and futures to recent versions. Took some inspiration from the previous gpio-cdev tokio work and also picked some parts from #60. Not tested the mio parts but the tokio example (from #60) seems to be working. Co-authored-by: Olle Sandberg <[email protected]> Co-authored-by: Paul Osborne <[email protected]>
2 parents 12b60cf + 28b52c0 commit 4713a02

File tree

6 files changed

+92
-139
lines changed

6 files changed

+92
-139
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
- Migrated to 'tokio' crate.
99
- Updated `nix` to version 0.22.
1010
- Minimmum supported Rust version updated to 1.46.0.
11+
- Updated `tokio`to version 1.
12+
- Updated `mio` to version 0.7.
13+
- Updated `futures` to version 0.3.
1114

1215
## [0.5.3] - 2018-04-19
1316

Cargo.toml

+11-3
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,21 @@ homepage = "https://github.com/rust-embedded/rust-sysfs-gpio"
1111
documentation = "https://docs.rs/sysfs_gpio/"
1212
description = "Provides access to GPIOs using the Linux sysfs interface."
1313
readme = "README.md"
14+
edition = "2018"
1415

1516
[features]
1617
mio-evented = ["mio"]
1718
async-tokio = ["futures", "tokio", "mio-evented"]
1819

1920
[dependencies]
20-
futures = { version = "0.1", optional = true }
21+
futures = { version = "0.3", optional = true }
2122
nix = "0.22"
22-
mio = { version = "0.6", optional = true }
23-
tokio = { version = "0.1", optional = true }
23+
mio = { version = "0.7", optional = true, features = ["os-ext"]}
24+
tokio = { version = "1", optional = true, features = ["net"] }
25+
26+
[dev-dependencies]
27+
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
28+
29+
[[example]]
30+
name = "tokio"
31+
required-features = ["async-tokio"]

examples/blinky.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,9 @@ fn get_args() -> Option<Arguments> {
5858
Err(_) => return None,
5959
};
6060
Some(Arguments {
61-
pin: pin,
62-
duration_ms: duration_ms,
63-
period_ms: period_ms,
61+
pin,
62+
duration_ms,
63+
period_ms,
6464
})
6565
}
6666

examples/tokio.rs

+24-43
Original file line numberDiff line numberDiff line change
@@ -1,63 +1,44 @@
1-
#[cfg(feature = "async-tokio")]
2-
extern crate futures;
3-
#[cfg(feature = "async-tokio")]
4-
extern crate sysfs_gpio;
5-
#[cfg(feature = "async-tokio")]
6-
extern crate tokio;
1+
// Copyright (c) 2020. The sysfs-gpio Authors.
72

8-
#[cfg(feature = "async-tokio")]
3+
use futures::future::join_all;
4+
use futures::StreamExt;
95
use std::env;
10-
11-
#[cfg(feature = "async-tokio")]
12-
use futures::{lazy, Future, Stream};
13-
14-
#[cfg(feature = "async-tokio")]
156
use sysfs_gpio::{Direction, Edge, Pin};
167

17-
#[cfg(feature = "async-tokio")]
18-
fn stream(pin_nums: Vec<u64>) -> sysfs_gpio::Result<()> {
8+
async fn monitor_pin(pin: Pin) -> Result<(), sysfs_gpio::Error> {
9+
pin.export()?;
10+
pin.set_direction(Direction::In)?;
11+
pin.set_edge(Edge::BothEdges)?;
12+
let mut gpio_events = pin.get_value_stream()?;
13+
while let Some(evt) = gpio_events.next().await {
14+
let val = evt.unwrap();
15+
println!("Pin {} changed value to {}", pin.get_pin_num(), val);
16+
}
17+
Ok(())
18+
}
19+
20+
async fn stream(pin_nums: Vec<u64>) {
1921
// NOTE: this currently runs forever and as such if
2022
// the app is stopped (Ctrl-C), no cleanup will happen
2123
// and the GPIO will be left exported. Not much
2224
// can be done about this as Rust signal handling isn't
2325
// really present at the moment. Revisit later.
24-
let pins: Vec<_> = pin_nums.iter().map(|&p| (p, Pin::new(p))).collect();
25-
let task = lazy(move || {
26-
for &(i, ref pin) in pins.iter() {
27-
pin.export().unwrap();
28-
pin.set_direction(Direction::In).unwrap();
29-
pin.set_edge(Edge::BothEdges).unwrap();
30-
tokio::spawn(
31-
pin.get_value_stream()
32-
.unwrap()
33-
.for_each(move |val| {
34-
println!("Pin {} changed value to {}", i, val);
35-
Ok(())
36-
})
37-
.map_err(|_| ()),
38-
);
39-
}
40-
Ok(())
41-
});
42-
tokio::run(task);
43-
44-
Ok(())
26+
join_all(pin_nums.into_iter().map(|p| {
27+
let pin = Pin::new(p);
28+
tokio::task::spawn(monitor_pin(pin))
29+
}))
30+
.await;
4531
}
4632

47-
#[cfg(feature = "async-tokio")]
48-
fn main() {
33+
#[tokio::main]
34+
async fn main() {
4935
let pins: Vec<u64> = env::args()
5036
.skip(1)
5137
.map(|a| a.parse().expect("Pins must be specified as integers"))
5238
.collect();
5339
if pins.is_empty() {
5440
println!("Usage: ./tokio <pin> [pin ...]");
5541
} else {
56-
stream(pins).unwrap();
42+
stream(pins).await;
5743
}
5844
}
59-
60-
#[cfg(not(feature = "async-tokio"))]
61-
fn main() {
62-
println!("This example requires the `tokio` feature to be enabled.");
63-
}

src/error.rs

-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
#[cfg(not(target_os = "wasi"))]
2-
use nix;
31
use std::convert;
42
use std::fmt;
53
use std::io;

src/lib.rs

+51-88
Original file line numberDiff line numberDiff line change
@@ -53,28 +53,29 @@ extern crate nix;
5353
#[cfg(feature = "async-tokio")]
5454
extern crate tokio;
5555

56-
use std::fs;
57-
use std::fs::File;
5856
use std::io;
5957
use std::io::prelude::*;
6058
#[cfg(any(target_os = "linux", target_os = "android", feature = "async-tokio"))]
6159
use std::io::SeekFrom;
6260
#[cfg(not(target_os = "wasi"))]
6361
use std::os::unix::prelude::*;
6462
use std::path::Path;
63+
use std::{fs, fs::File};
6564

6665
#[cfg(feature = "async-tokio")]
67-
use futures::{Async, Poll, Stream};
66+
use futures::{ready, Stream};
6867
#[cfg(feature = "mio-evented")]
69-
use mio::unix::EventedFd;
68+
use mio::event::Source;
7069
#[cfg(feature = "mio-evented")]
71-
use mio::Evented;
70+
use mio::unix::SourceFd;
7271
#[cfg(any(target_os = "linux", target_os = "android"))]
7372
use nix::sys::epoll::*;
7473
#[cfg(not(target_os = "wasi"))]
7574
use nix::unistd::close;
7675
#[cfg(feature = "async-tokio")]
77-
use tokio::reactor::{Handle, PollEvented};
76+
use std::task::Poll;
77+
#[cfg(feature = "async-tokio")]
78+
use tokio::io::unix::AsyncFd;
7879

7980
pub use error::Error;
8081

@@ -472,17 +473,6 @@ impl Pin {
472473
AsyncPinPoller::new(self.pin_num)
473474
}
474475

475-
/// Get a Stream of pin interrupts for this pin
476-
///
477-
/// The PinStream object can be used with the `tokio` crate. You should probably call
478-
/// `set_edge()` before using this.
479-
///
480-
/// This method is only available when the `async-tokio` crate feature is enabled.
481-
#[cfg(feature = "async-tokio")]
482-
pub fn get_stream_with_handle(&self, handle: &Handle) -> Result<PinStream> {
483-
PinStream::init_with_handle(*self, handle)
484-
}
485-
486476
/// Get a Stream of pin interrupts for this pin
487477
///
488478
/// The PinStream object can be used with the `tokio` crate. You should probably call
@@ -494,22 +484,6 @@ impl Pin {
494484
PinStream::init(*self)
495485
}
496486

497-
/// Get a Stream of pin values for this pin
498-
///
499-
/// The PinStream object can be used with the `tokio` crate. You should probably call
500-
/// `set_edge(Edge::BothEdges)` before using this.
501-
///
502-
/// Note that the values produced are the value of the pin as soon as we get to handling the
503-
/// interrupt in userspace. Each time this stream produces a value, a change has occurred, but
504-
/// it could end up producing the same value multiple times if the value has changed back
505-
/// between when the interrupt occurred and when the value was read.
506-
///
507-
/// This method is only available when the `async-tokio` crate feature is enabled.
508-
#[cfg(feature = "async-tokio")]
509-
pub fn get_value_stream_with_handle(&self, handle: &Handle) -> Result<PinValueStream> {
510-
Ok(PinValueStream(PinStream::init_with_handle(*self, handle)?))
511-
}
512-
513487
/// Get a Stream of pin values for this pin
514488
///
515489
/// The PinStream object can be used with the `tokio` crate. You should probably call
@@ -536,9 +510,9 @@ fn extract_pin_fom_path_test() {
536510
let tok3 = Pin::extract_pin_from_path(&"../../devices/soc0/gpiochip3/gpio/gpio124");
537511
assert_eq!(124, tok3.unwrap());
538512
let err1 = Pin::extract_pin_from_path(&"/sys/CLASS/gpio/gpio");
539-
assert_eq!(true, err1.is_err());
513+
assert!(err1.is_err());
540514
let err2 = Pin::extract_pin_from_path(&"/sys/class/gpio/gpioSDS");
541-
assert_eq!(true, err2.is_err());
515+
assert!(err2.is_err());
542516
}
543517
#[cfg(not(target_os = "wasi"))]
544518
#[derive(Debug)]
@@ -643,76 +617,70 @@ impl AsyncPinPoller {
643617
}
644618

645619
#[cfg(feature = "mio-evented")]
646-
impl Evented for AsyncPinPoller {
620+
impl Source for AsyncPinPoller {
647621
fn register(
648-
&self,
649-
poll: &mio::Poll,
622+
&mut self,
623+
poll: &mio::Registry,
650624
token: mio::Token,
651-
interest: mio::Ready,
652-
opts: mio::PollOpt,
625+
interest: mio::Interest,
653626
) -> io::Result<()> {
654-
EventedFd(&self.devfile.as_raw_fd()).register(poll, token, interest, opts)
627+
SourceFd(&self.as_raw_fd()).register(poll, token, interest)
655628
}
656629

657630
fn reregister(
658-
&self,
659-
poll: &mio::Poll,
631+
&mut self,
632+
poll: &mio::Registry,
660633
token: mio::Token,
661-
interest: mio::Ready,
662-
opts: mio::PollOpt,
634+
interest: mio::Interest,
663635
) -> io::Result<()> {
664-
EventedFd(&self.devfile.as_raw_fd()).reregister(poll, token, interest, opts)
636+
SourceFd(&self.as_raw_fd()).reregister(poll, token, interest)
665637
}
666638

667-
fn deregister(&self, poll: &mio::Poll) -> io::Result<()> {
668-
EventedFd(&self.devfile.as_raw_fd()).deregister(poll)
639+
fn deregister(&mut self, poll: &mio::Registry) -> io::Result<()> {
640+
SourceFd(&self.as_raw_fd()).deregister(poll)
669641
}
670642
}
671643

672-
#[cfg(feature = "async-tokio")]
673-
pub struct PinStream {
674-
evented: PollEvented<AsyncPinPoller>,
675-
skipped_first_event: bool,
644+
#[cfg(any(feature = "async-tokio", feature = "mio-evented"))]
645+
impl AsRawFd for AsyncPinPoller {
646+
fn as_raw_fd(&self) -> RawFd {
647+
self.devfile.as_raw_fd()
648+
}
676649
}
677650

678651
#[cfg(feature = "async-tokio")]
679-
impl PinStream {
680-
pub fn init_with_handle(pin: Pin, handle: &Handle) -> Result<Self> {
681-
Ok(PinStream {
682-
evented: PollEvented::new(pin.get_async_poller()?, handle)?,
683-
skipped_first_event: false,
684-
})
685-
}
652+
pub struct PinStream {
653+
evented: AsyncFd<AsyncPinPoller>,
654+
skipped_first_event: bool,
686655
}
687656

688657
#[cfg(feature = "async-tokio")]
689658
impl PinStream {
690659
pub fn init(pin: Pin) -> Result<Self> {
691660
Ok(PinStream {
692-
evented: PollEvented::new(pin.get_async_poller()?, &Handle::default())?,
661+
evented: AsyncFd::new(pin.get_async_poller()?)?,
693662
skipped_first_event: false,
694663
})
695664
}
696665
}
697666

698667
#[cfg(feature = "async-tokio")]
699668
impl Stream for PinStream {
700-
type Item = ();
701-
type Error = Error;
702-
703-
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
704-
Ok(match self.evented.poll_read() {
705-
Async::Ready(()) => {
706-
self.evented.need_read()?;
707-
if self.skipped_first_event {
708-
Async::Ready(Some(()))
709-
} else {
710-
self.skipped_first_event = true;
711-
Async::NotReady
712-
}
669+
type Item = Result<()>;
670+
671+
fn poll_next(
672+
mut self: std::pin::Pin<&mut Self>,
673+
cx: &mut std::task::Context<'_>,
674+
) -> Poll<Option<Self::Item>> {
675+
loop {
676+
let mut guard = ready!(self.evented.poll_read_ready(cx))?;
677+
guard.clear_ready();
678+
if self.skipped_first_event {
679+
return Poll::Ready(Some(Ok(())));
680+
} else {
681+
self.skipped_first_event = true;
713682
}
714-
Async::NotReady => Async::NotReady,
715-
})
683+
}
716684
}
717685
}
718686

@@ -729,18 +697,13 @@ impl PinValueStream {
729697

730698
#[cfg(feature = "async-tokio")]
731699
impl Stream for PinValueStream {
732-
type Item = u8;
733-
type Error = Error;
734-
735-
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
736-
match self.0.poll() {
737-
Ok(Async::Ready(Some(()))) => {
738-
let value = self.get_value()?;
739-
Ok(Async::Ready(Some(value)))
740-
}
741-
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
742-
Ok(Async::NotReady) => Ok(Async::NotReady),
743-
Err(e) => Err(e),
744-
}
700+
type Item = Result<u8>;
701+
702+
fn poll_next(
703+
mut self: std::pin::Pin<&mut Self>,
704+
cx: &mut std::task::Context<'_>,
705+
) -> Poll<Option<Self::Item>> {
706+
ready!(std::pin::Pin::new(&mut self.0).poll_next(cx));
707+
Poll::Ready(Some(Ok(self.get_value()?)))
745708
}
746709
}

0 commit comments

Comments
 (0)