Skip to content
This repository was archived by the owner on Oct 30, 2019. It is now read-only.

for_await macro #25

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ edition = "2018"
futures-preview = "0.3.0-alpha.15"
runtime-attributes = { path = "runtime-attributes", version = "0.3.0-alpha.3" }
runtime-raw = { path = "runtime-raw", version = "0.3.0-alpha.2" }
runtime-native = { path = "runtime-native", version = "0.3.0-alpha.2" }
# runtime-native = { path = "runtime-native", version = "0.3.0-alpha.2" }

[dev-dependencies]
failure = "0.1.5"
Expand All @@ -40,3 +40,8 @@ members = [
"runtime-raw",
"runtime-tokio",
]

[patch.crates-io.futures-preview]
git = "https://github.com/taiki-e/futures-rs"
branch = "async-stream"
features = ["async-stream", "nightly"]
9 changes: 5 additions & 4 deletions examples/guessing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ async fn main() -> Result<(), failure::Error> {
let mut listener = TcpListener::bind("127.0.0.1:8080")?;
println!("Listening on {}", &listener.local_addr()?);

let mut incoming = listener.incoming();
while let Some(stream) = await!(incoming.next()) {
runtime::spawn(play(stream?));
}
let incoming = listener.incoming().map_err(|e| e.into());
await!(incoming.try_for_each_concurrent(!0, async move |stream| {
await!(runtime::spawn(play(stream)))?;
Ok::<(), failure::Error>(())
}))?;
Ok(())
}
21 changes: 9 additions & 12 deletions examples/tcp-echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,24 @@
//! Run the server and connect to it with `nc 127.0.0.1 8080`.
//! The server will wait for you to enter lines of text and then echo them back.

#![feature(async_await, await_macro)]
#![feature(async_await, await_macro, stmt_expr_attributes, proc_macro_hygiene)]

use futures::prelude::*;
use runtime::net::TcpListener;
use runtime::for_await;

#[runtime::main]
#[runtime::main(runtime_tokio::Tokio)]
async fn main() -> std::io::Result<()> {
let mut listener = TcpListener::bind("127.0.0.1:8080")?;
println!("Listening on {}", listener.local_addr()?);

// accept connections and process them in parallel
let mut incoming = listener.incoming();
while let Some(stream) = await!(incoming.next()) {
runtime::spawn(async move {
let stream = stream?;
println!("Accepting from: {}", stream.peer_addr()?);
#[for_await(try_parallel)]
for stream in listener.incoming() {
println!("Accepting from: {}", stream.peer_addr()?);

let (reader, writer) = &mut stream.split();
await!(reader.copy_into(writer))?;
Ok::<(), std::io::Error>(())
});
let (reader, writer) = &mut stream.split();
await!(reader.copy_into(writer))?;
Ok::<(), std::io::Error>(())
}
Ok(())
}
40 changes: 19 additions & 21 deletions examples/tcp-proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,26 @@ async fn main() -> std::io::Result<()> {
let mut listener = TcpListener::bind("127.0.0.1:8081")?;
println!("Listening on {}", listener.local_addr()?);

// accept connections and process them serially
let mut incoming = listener.incoming();
while let Some(client) = await!(incoming.next()) {
let handle = runtime::spawn(async move {
let client = client?;
let server = await!(TcpStream::connect("127.0.0.1:8080"))?;
println!(
"Proxying {} to {}",
client.peer_addr()?,
server.peer_addr()?
);
// accept connections and process them in parallel
await!(listener
.incoming()
.try_for_each_concurrent(!0, async move |client| {
await!(runtime::spawn(async move {
let server = await!(TcpStream::connect("127.0.0.1:8080"))?;
println!(
"Proxying {} to {}",
client.peer_addr()?,
server.peer_addr()?
);

let (cr, cw) = &mut client.split();
let (sr, sw) = &mut server.split();
let a = cr.copy_into(sw);
let b = sr.copy_into(cw);
try_join!(a, b)?;
let (cr, cw) = &mut client.split();
let (sr, sw) = &mut server.split();
let a = cr.copy_into(sw);
let b = sr.copy_into(cw);
try_join!(a, b)?;

Ok::<(), std::io::Error>(())
});

await!(handle)?;
}
Ok::<(), std::io::Error>(())
}))
}))?;
Ok(())
}
2 changes: 1 addition & 1 deletion runtime-attributes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ edition = "2018"
proc-macro = true

[dependencies]
syn = { version = "0.15.33", features = ["full"] }
syn = { version = "0.15.34", features = ["full", "extra-traits"] }
proc-macro2 = { version = "0.4.29", features = ["nightly"] }
quote = "0.6.12"

Expand Down
66 changes: 66 additions & 0 deletions runtime-attributes/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,69 @@ pub fn bench(attr: TokenStream, item: TokenStream) -> TokenStream {

result.into()
}

/// Create an async loop.
///
/// # Examples
///
/// ```ignore
/// #![feature(async_await, await_macro)]
///
/// use futures::prelude::*;
/// use futures::stream;
/// use runtime::for_await;
///
/// #[runtime::main]
/// async fn main() {
/// // Print items in a series
/// #[for_await]
/// for value in stream::iter(1..=5) {
/// println!("{}", value);
/// }
///
/// // Print items in a series
/// #[for_await(serial)]
/// for value in stream::iter(1..=5) {
/// println!("{}", value);
/// }
///
/// // Print items in parallel, spawning each iteration on the threadpool
/// #[for_await(try_parallel)]
/// for value in stream::iter(1..=5) {
/// println!("{}", value);
/// }
/// }
/// ```
#[proc_macro_attribute]
pub fn for_await(attr: TokenStream, item: TokenStream) -> TokenStream {
let attr_value = if attr.is_empty() {
syn::parse_str("serial").unwrap()
} else {
syn::parse_macro_input!(attr as syn::Expr)
};

let input = syn::parse_macro_input!(item as syn::ExprForLoop);
let attrs = &input.attrs;
let pat = &input.pat;
let expr = &input.expr;
let body_block = &input.body;

match &*format!("{:?}", attr_value) {
"serial" => quote! {
#(#attrs)*
#[futures::for_await]
for #pat in #expr #body_block
}
.into(),
"try_parallel" => quote! {
let stream = #expr.map_err(|e| e.into());
await!(stream.try_for_each_concurrent(None, async move |#expr| {
await!(runtime::spawn(async move #body_block))
}))?;
}
.into(),
_ => TokenStream::from(quote_spanned! {
input.span() => compile_error!(r##"#[for_await] takes an optional argument of either "serial" or "try_parallel"##);
}),
}
}
6 changes: 3 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ pub mod task;
pub use task::spawn;

#[doc(inline)]
pub use runtime_attributes::{bench, test};
pub use runtime_attributes::{bench, for_await, test};

#[doc(inline)]
#[cfg(not(test))] // NOTE: exporting main breaks tests, we should file an issue.
Expand All @@ -110,5 +110,5 @@ pub use runtime_attributes::main;
#[doc(hidden)]
pub use runtime_raw as raw;

#[doc(hidden)]
pub use runtime_native as native;
// #[doc(hidden)]
// pub use runtime_native as native;
16 changes: 0 additions & 16 deletions src/net/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,14 +175,6 @@ impl AsyncRead for TcpStream {
) -> Poll<io::Result<usize>> {
self.inner.as_mut().poll_read(cx, buf)
}

fn poll_vectored_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
vec: &mut [&mut IoVec],
) -> Poll<io::Result<usize>> {
self.inner.as_mut().poll_vectored_read(cx, vec)
}
}

impl AsyncWrite for TcpStream {
Expand All @@ -201,14 +193,6 @@ impl AsyncWrite for TcpStream {
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.inner.as_mut().poll_close(cx)
}

fn poll_vectored_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
vec: &[&IoVec],
) -> Poll<io::Result<usize>> {
self.inner.as_mut().poll_vectored_write(cx, vec)
}
}

/// The future returned by [`TcpStream::connect`].
Expand Down