Skip to content

Commit a9f2ea2

Browse files
committed
Add async_stream to create streams via generators
1 parent 2e2b013 commit a9f2ea2

File tree

16 files changed

+816
-1
lines changed

16 files changed

+816
-1
lines changed

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
[workspace]
22
members = [
33
"futures",
4+
"futures-async-macro",
45
"futures-core",
56
"futures-channel",
67
"futures-executor",

futures-async-macro/Cargo.toml

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
[package]
2+
name = "futures-async-macro-preview"
3+
edition = "2018"
4+
version = "0.3.0-alpha.15"
5+
authors = ["Alex Crichton <[email protected]>"]
6+
license = "MIT OR Apache-2.0"
7+
repository = "https://github.com/rust-lang-nursery/futures-rs"
8+
homepage = "https://rust-lang-nursery.github.io/futures-rs"
9+
documentation = "https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.14/futures_async_macro"
10+
description = """
11+
Definition of the `#[async_stream]` macro for the `futures-rs` crate as well as a few other assorted macros.
12+
"""
13+
14+
[lib]
15+
name = "futures_async_macro"
16+
proc-macro = true
17+
18+
[features]
19+
std = []
20+
21+
[dependencies]
22+
proc-macro2 = "0.4"
23+
quote = "0.6"
24+
syn = { version = "0.15.31", features = ["full", "fold"] }

futures-async-macro/LICENSE-APACHE

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
../LICENSE-APACHE

futures-async-macro/LICENSE-MIT

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
../LICENSE-MIT

futures-async-macro/README.md

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
2+
**Note that these are experimental APIs.**
3+
4+
## Usage
5+
6+
Add this to your `Cargo.toml`:
7+
8+
```toml
9+
[dependencies]
10+
futures-preview = { version = "0.3.0-alpha.14", features = ["async-stream", "nightly"] }
11+
```
12+
13+
### \#\[for_await\]
14+
15+
Processes streams using a for loop.
16+
17+
This is a reimplement of [futures-await]'s `#[async]` for loops for futures 0.3 and is an experimental implementation of [the idea listed as the next step of async/await](https://github.com/rust-lang/rfcs/blob/master/text/2394-async_await.md#for-await-and-processing-streams).
18+
19+
```rust
20+
use futures::for_await;
21+
use futures::prelude::*;
22+
23+
#[for_await]
24+
for value in stream::iter(1..=5) {
25+
println!("{}", value);
26+
}
27+
```
28+
29+
`value` has the `Item` type of the stream passed in. Note that async for loops can only be used inside of `async` functions or `#[async_stream]` functions.
30+
31+
### \#\[async_stream\]
32+
33+
This is a reimplement of [futures-await]'s `#[async_stream]` for futures 0.3 and is an experimental implementation of [the idea listed as the next step of async/await](https://github.com/rust-lang/rfcs/blob/master/text/2394-async_await.md#generators-and-streams).
34+
35+
```rust
36+
use futures::prelude::*;
37+
use futures::{async_stream, stream_yield};
38+
39+
// Returns a stream of i32
40+
#[async_stream]
41+
fn foo(stream: impl Stream<Item = String>) -> i32 {
42+
#[for_await]
43+
for x in stream {
44+
stream_yield!(x.parse().unwrap());
45+
}
46+
}
47+
```
48+
49+
`#[async_stream]` have an item type specified via `-> some::Path` and the values output from the stream must be yielded via the `stream_yield!` macro.
50+
51+
[futures-await]: https://github.com/alexcrichton/futures-await

futures-async-macro/src/elision.rs

+101
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
use proc_macro2::Span;
2+
use syn::fold::Fold;
3+
use syn::punctuated::Punctuated;
4+
use syn::token::Comma;
5+
use syn::{ArgSelfRef, FnArg, GenericParam, Lifetime, LifetimeDef, TypeReference};
6+
7+
pub(super) fn unelide_lifetimes(
8+
generics: &mut Punctuated<GenericParam, Comma>,
9+
args: Vec<FnArg>,
10+
) -> Vec<FnArg> {
11+
let mut folder = UnelideLifetimes::new(generics);
12+
args.into_iter().map(|arg| folder.fold_fn_arg(arg)).collect()
13+
}
14+
15+
struct UnelideLifetimes<'a> {
16+
generics: &'a mut Punctuated<GenericParam, Comma>,
17+
lifetime_index: usize,
18+
lifetime_name: String,
19+
count: u32,
20+
}
21+
22+
impl<'a> UnelideLifetimes<'a> {
23+
fn new(generics: &'a mut Punctuated<GenericParam, Comma>) -> UnelideLifetimes<'a> {
24+
let lifetime_index = lifetime_index(generics);
25+
let lifetime_name = lifetime_name(generics);
26+
UnelideLifetimes { generics, lifetime_index, lifetime_name, count: 0 }
27+
}
28+
29+
// Constitute a new lifetime
30+
fn new_lifetime(&mut self) -> Lifetime {
31+
let lifetime_name = format!("{}{}", self.lifetime_name, self.count);
32+
let lifetime = Lifetime::new(&lifetime_name, Span::call_site());
33+
34+
let idx = self.lifetime_index + self.count as usize;
35+
self.generics.insert(idx, GenericParam::Lifetime(LifetimeDef::new(lifetime.clone())));
36+
self.count += 1;
37+
38+
lifetime
39+
}
40+
41+
// Take an Option<Lifetime> and guarantee its an unelided lifetime
42+
fn expand_lifetime(&mut self, lifetime: Option<Lifetime>) -> Lifetime {
43+
match lifetime {
44+
Some(l) => self.fold_lifetime(l),
45+
None => self.new_lifetime(),
46+
}
47+
}
48+
}
49+
50+
impl Fold for UnelideLifetimes<'_> {
51+
// Handling self arguments
52+
fn fold_arg_self_ref(&mut self, arg: ArgSelfRef) -> ArgSelfRef {
53+
let ArgSelfRef { and_token, lifetime, mutability, self_token } = arg;
54+
let lifetime = Some(self.expand_lifetime(lifetime));
55+
ArgSelfRef { and_token, lifetime, mutability, self_token }
56+
}
57+
58+
// If the lifetime is `'_`, replace it with a new unelided lifetime
59+
fn fold_lifetime(&mut self, lifetime: Lifetime) -> Lifetime {
60+
if lifetime.ident == "_" {
61+
self.new_lifetime()
62+
} else {
63+
lifetime
64+
}
65+
}
66+
67+
// If the reference's lifetime is elided, replace it with a new unelided lifetime
68+
fn fold_type_reference(&mut self, ty_ref: TypeReference) -> TypeReference {
69+
let TypeReference { and_token, lifetime, mutability, elem } = ty_ref;
70+
let lifetime = Some(self.expand_lifetime(lifetime));
71+
let elem = Box::new(self.fold_type(*elem));
72+
TypeReference { and_token, lifetime, mutability, elem }
73+
}
74+
}
75+
76+
fn lifetime_index(generics: &Punctuated<GenericParam, Comma>) -> usize {
77+
generics
78+
.iter()
79+
.take_while(|param| if let GenericParam::Lifetime(_) = param { true } else { false })
80+
.count()
81+
}
82+
83+
// Determine the prefix for all lifetime names. Ensure it doesn't
84+
// overlap with any existing lifetime names.
85+
fn lifetime_name(generics: &Punctuated<GenericParam, Comma>) -> String {
86+
let mut lifetime_name = String::from("'_async");
87+
let existing_lifetimes: Vec<String> = generics
88+
.iter()
89+
.filter_map(|param| {
90+
if let GenericParam::Lifetime(LifetimeDef { lifetime, .. }) = param {
91+
Some(lifetime.to_string())
92+
} else {
93+
None
94+
}
95+
})
96+
.collect();
97+
while existing_lifetimes.iter().any(|name| name.starts_with(&lifetime_name)) {
98+
lifetime_name.push('_');
99+
}
100+
lifetime_name
101+
}

futures-async-macro/src/error.rs

+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Make error messages a little more readable than `panic!`.
2+
macro_rules! error {
3+
($msg:expr) => {
4+
return proc_macro::TokenStream::from(
5+
syn::Error::new(proc_macro2::Span::call_site(), $msg).to_compile_error(),
6+
)
7+
};
8+
}
9+
10+
// TODO: Should we give another name?
11+
// `assert!` that call `error!` instead of `panic!`.
12+
macro_rules! assert_ {
13+
($e:expr, $msg:expr) => {
14+
if !$e {
15+
error!($msg)
16+
}
17+
};
18+
}
19+
20+
pub(super) fn expr_compile_error(e: &syn::Error) -> syn::Expr {
21+
syn::parse2(e.to_compile_error()).unwrap()
22+
}
23+
24+
// Long error messages and error messages that are called multiple times
25+
26+
macro_rules! args_is_not_empty {
27+
($name:expr) => {
28+
concat!("attribute must be of the form `#[", $name, "]`")
29+
};
30+
}
31+
32+
macro_rules! outside_of_async_error {
33+
($tokens:expr, $name:expr) => {
34+
$crate::error::expr_compile_error(&syn::Error::new_spanned(
35+
$tokens,
36+
concat!(
37+
$name,
38+
" cannot be allowed outside of \
39+
async closures, blocks, functions, async_stream blocks, and functions."
40+
),
41+
))
42+
};
43+
}
44+
45+
macro_rules! outside_of_async_stream_error {
46+
($tokens:expr, $name:expr) => {
47+
$crate::error::expr_compile_error(&syn::Error::new_spanned(
48+
$tokens,
49+
concat!($name, " cannot be allowed outside of async_stream blocks, and functions."),
50+
))
51+
};
52+
}

0 commit comments

Comments
 (0)