Skip to content

Commit a61c994

Browse files
committed
Merge branch 'release/v0.1.0'
2 parents 40e17d4 + ba10875 commit a61c994

File tree

13 files changed

+404
-12
lines changed

13 files changed

+404
-12
lines changed

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,8 @@ Cargo.lock
4141
# These are backup files generated by rustfmt
4242
**/*.rs.bk
4343

44+
# Others
45+
async-pipe-rs.iws
46+
47+
4448
# End of https://www.gitignore.io/api/rust,macos

.idea/.gitignore

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/misc.xml

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/modules.xml

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/vcs.xml

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,19 @@
11
[package]
2-
name = "async-pipe-rs"
2+
name = "async-pipe"
33
version = "0.1.0"
4+
description = "Creates an asynchronous piped reader and writer pair using tokio.rs"
5+
homepage = "https://github.com/rousan/async-pipe-rs"
6+
repository = "https://github.com/rousan/async-pipe-rs"
7+
keywords = ["pipe", "future", "async", "reader", "writer"]
8+
categories = ["asynchronous"]
49
authors = ["Rousan Ali <[email protected]>"]
10+
readme = "README.md"
11+
license = "MIT"
512
edition = "2018"
613

7-
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
8-
914
[dependencies]
15+
tokio = { version = "0.2", features= [] }
16+
log = "0.4"
17+
18+
[dev-dependencies]
19+
tokio = { version = "0.2", features = ["full"] }

README.md

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,33 @@
1-
# pipe-rs
1+
# async-pipe-rs
22

3-
```Rust
4-
pipe<W: AsyncWrite, R: AsyncRead>() -> (W, R)
3+
[![crates.io](https://img.shields.io/crates/v/async-rs.svg)](https://crates.io/crates/async-rs)
4+
[![Documentation](https://docs.rs/async-rs/badge.svg)](https://docs.rs/async-rs)
5+
[![MIT](https://img.shields.io/crates/l/async-rs.svg)](./LICENSE)
6+
7+
Creates an asynchronous piped reader and writer pair using `tokio.rs`.
8+
9+
[Docs](https://docs.rs/async-rs)
10+
11+
## Example
12+
13+
```rust
14+
use async_pipe;
15+
use tokio::prelude::*;
16+
17+
#[tokio::main]
18+
async fn main() {
19+
let (mut w, mut r) = async_pipe::pipe();
20+
21+
tokio::spawn(async move {
22+
w.write_all(b"hello world").await.unwrap();
23+
});
24+
25+
let mut v = Vec::new();
26+
r.read_to_end(&mut v).await.unwrap();
27+
println!("Received: {:?}", String::from_utf8(v));
28+
}
529
```
30+
31+
## Contributing
32+
33+
Your PRs and stars are always welcome.

async-pipe-rs.iml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<module type="RUST_MODULE" version="4">
3+
<component name="NewModuleRootManager" inherit-compiler-output="true">
4+
<exclude-output />
5+
<content url="file://$MODULE_DIR$">
6+
<sourceFolder url="file://$MODULE_DIR$/src" isTestSource="false" />
7+
<sourceFolder url="file://$MODULE_DIR$/examples" isTestSource="false" />
8+
<sourceFolder url="file://$MODULE_DIR$/tests" isTestSource="true" />
9+
<sourceFolder url="file://$MODULE_DIR$/benches" isTestSource="true" />
10+
<excludeFolder url="file://$MODULE_DIR$/target" />
11+
</content>
12+
<orderEntry type="inheritedJdk" />
13+
<orderEntry type="sourceFolder" forTests="false" />
14+
</component>
15+
</module>

examples/main.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
use async_pipe;
2+
use tokio::prelude::*;
3+
4+
#[tokio::main]
5+
async fn main() {
6+
let (mut w, mut r) = async_pipe::pipe();
7+
8+
tokio::spawn(async move {
9+
w.write_all(b"hello world").await.unwrap();
10+
});
11+
12+
let mut v = Vec::new();
13+
r.read_to_end(&mut v).await.unwrap();
14+
println!("Received: {:?}", String::from_utf8(v));
15+
}

src/lib.rs

Lines changed: 55 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,56 @@
1-
#[cfg(test)]
2-
mod tests {
3-
#[test]
4-
fn it_works() {
5-
assert_eq!(2 + 2, 4);
6-
}
1+
//! Creates an asynchronous piped reader and writer pair using `tokio.rs`.
2+
//!
3+
//! # Examples
4+
//!
5+
//! ```
6+
//! # async fn run() {
7+
//! use async_pipe;
8+
//! use tokio::prelude::*;
9+
//!
10+
//! let (mut w, mut r) = async_pipe::pipe();
11+
//!
12+
//! tokio::spawn(async move {
13+
//! w.write_all(b"hello world").await.unwrap();
14+
//! });
15+
//!
16+
//! let mut v = Vec::new();
17+
//! r.read_to_end(&mut v).await.unwrap();
18+
//!
19+
//! println!("Received: {:?}", String::from_utf8(v));
20+
//! # }
21+
//!
22+
//! tokio::runtime::Runtime::new().unwrap().block_on(run());
23+
//! ```
24+
25+
use state::State;
26+
use std::sync::{Arc, Mutex};
27+
28+
pub use self::reader::PipeReader;
29+
pub use self::writer::PipeWriter;
30+
31+
mod reader;
32+
mod state;
33+
mod writer;
34+
35+
/// Creates a piped pair of an [`AsyncWrite`](https://docs.rs/tokio/0.2.16/tokio/io/trait.AsyncWrite.html) and an [`AsyncRead`](https://docs.rs/tokio/0.2.15/tokio/io/trait.AsyncRead.html).
36+
pub fn pipe() -> (PipeWriter, PipeReader) {
37+
let shared_state = Arc::new(Mutex::new(State {
38+
reader_waker: None,
39+
writer_waker: None,
40+
data: None,
41+
done_reading: false,
42+
read: 0,
43+
done_cycle: true,
44+
closed: false,
45+
}));
46+
47+
let w = PipeWriter {
48+
state: Arc::clone(&shared_state),
49+
};
50+
51+
let r = PipeReader {
52+
state: Arc::clone(&shared_state),
53+
};
54+
55+
(w, r)
756
}

src/reader.rs

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
use crate::state::{Data, State};
2+
use std::pin::Pin;
3+
use std::ptr;
4+
use std::sync::{Arc, Mutex};
5+
use std::task::{Context, Poll};
6+
use tokio::io::{self, AsyncRead};
7+
8+
/// The read half of the pipe which implements [`AsyncRead`](https://docs.rs/tokio/0.2.15/tokio/io/trait.AsyncRead.html).
9+
pub struct PipeReader {
10+
pub(crate) state: Arc<Mutex<State>>,
11+
}
12+
13+
impl PipeReader {
14+
/// Closes the pipe, any further read will return EOF and any further write will raise an error.
15+
pub fn close(&self) -> io::Result<()> {
16+
match self.state.lock() {
17+
Ok(mut state) => {
18+
state.closed = true;
19+
self.wake_writer_half(&*state);
20+
Ok(())
21+
}
22+
Err(err) => Err(io::Error::new(
23+
io::ErrorKind::Other,
24+
format!(
25+
"{}: PipeReader: Failed to lock the channel state: {}",
26+
env!("CARGO_PKG_NAME"),
27+
err
28+
),
29+
)),
30+
}
31+
}
32+
33+
fn wake_writer_half(&self, state: &State) {
34+
if let Some(ref waker) = state.writer_waker {
35+
waker.clone().wake();
36+
}
37+
}
38+
39+
fn copy_data_into_buffer(&self, data: &Data, buf: &mut [u8]) -> usize {
40+
let len = data.len.min(buf.len());
41+
unsafe {
42+
ptr::copy_nonoverlapping(data.ptr, buf.as_mut_ptr(), len);
43+
}
44+
len
45+
}
46+
}
47+
48+
impl Drop for PipeReader {
49+
fn drop(&mut self) {
50+
if let Err(err) = self.close() {
51+
log::warn!(
52+
"{}: PipeReader: Failed to close the channel on drop: {}",
53+
env!("CARGO_PKG_NAME"),
54+
err
55+
);
56+
}
57+
}
58+
}
59+
60+
impl AsyncRead for PipeReader {
61+
fn poll_read(
62+
self: Pin<&mut Self>,
63+
cx: &mut Context,
64+
buf: &mut [u8],
65+
) -> Poll<io::Result<usize>> {
66+
let mut state;
67+
match self.state.lock() {
68+
Ok(s) => state = s,
69+
Err(err) => {
70+
return Poll::Ready(Err(io::Error::new(
71+
io::ErrorKind::Other,
72+
format!(
73+
"{}: PipeReader: Failed to lock the channel state: {}",
74+
env!("CARGO_PKG_NAME"),
75+
err
76+
),
77+
)))
78+
}
79+
}
80+
81+
if state.closed {
82+
return Poll::Ready(Ok(0));
83+
}
84+
85+
return if state.done_cycle {
86+
state.reader_waker = Some(cx.waker().clone());
87+
Poll::Pending
88+
} else {
89+
if let Some(ref data) = state.data {
90+
let copied_bytes_len = self.copy_data_into_buffer(data, buf);
91+
92+
state.data = None;
93+
state.read = copied_bytes_len;
94+
state.done_reading = true;
95+
state.reader_waker = None;
96+
97+
self.wake_writer_half(&*state);
98+
99+
Poll::Ready(Ok(copied_bytes_len))
100+
} else {
101+
state.reader_waker = Some(cx.waker().clone());
102+
Poll::Pending
103+
}
104+
};
105+
}
106+
}

src/state.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
use std::task::Waker;
2+
3+
pub(crate) struct State {
4+
pub(crate) reader_waker: Option<Waker>,
5+
pub(crate) writer_waker: Option<Waker>,
6+
pub(crate) data: Option<Data>,
7+
pub(crate) done_reading: bool,
8+
pub(crate) read: usize,
9+
pub(crate) done_cycle: bool,
10+
pub(crate) closed: bool,
11+
}
12+
13+
pub(crate) struct Data {
14+
pub(crate) ptr: *const u8,
15+
pub(crate) len: usize,
16+
}
17+
18+
unsafe impl Send for Data {}

0 commit comments

Comments
 (0)