forked from anp/moxie
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
zetanumbers
committed
Nov 29, 2020
1 parent
74fb3e7
commit f87b753
Showing
4 changed files
with
203 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,177 @@ | ||
mod waker; | ||
use waker::*; | ||
|
||
use super::{Revision, Runtime}; | ||
use futures::{ | ||
channel::mpsc, | ||
stream::{Stream, StreamExt}, | ||
task::{noop_waker, waker, LocalSpawn}, | ||
}; | ||
use std::{ | ||
pin::Pin, | ||
task::{Context as FutContext, Poll}, | ||
}; | ||
|
||
/// A [`Runtime`] that is bound with a particular root function. | ||
/// | ||
/// If running in a context with an async executor, can be consumed as a | ||
/// [`futures::Stream`] of [`crate::runtime::Revision`]s in order to provide | ||
/// the [`super::Runtime`] with a [`std::task::Waker`]. | ||
pub struct BindStream<Root> { | ||
inner: Runtime, | ||
root: Root, | ||
changes_receiver: mpsc::Receiver<()>, | ||
} | ||
|
||
impl super::Runtime { | ||
/// Returns this runtime bound with a specific root function as | ||
/// ([`futures::stream::Stream`]). | ||
pub fn bind<Root, Out>(self, root: Root) -> BindStream<Root> | ||
where | ||
Root: FnMut() -> Out + Unpin, | ||
{ | ||
BindStream::with_runtime(self, root) | ||
} | ||
|
||
/// Returns this runtime bound with a specific root function as | ||
/// ([`futures::stream::Stream`]). | ||
pub fn bind_init<Root, Out>(self, root: Root) -> (BindStream<Root>, Out) | ||
where | ||
Root: FnMut() -> Out + Unpin, | ||
{ | ||
let mut bound = self.bind(root); | ||
let out = bound.try_next().unwrap(); | ||
(bound, out) | ||
} | ||
} | ||
|
||
impl<Root, Out> BindStream<Root> | ||
where | ||
Root: FnMut() -> Out + Unpin, | ||
{ | ||
/// Creates a new `Runtime` attached to the provided root function. | ||
pub fn new(root: Root) -> Self { | ||
let rt = Runtime::new(); | ||
Self::with_runtime(rt, root) | ||
} | ||
|
||
/// Binds `Runtime` to the provided root function. | ||
pub fn with_runtime(mut rt: super::Runtime, root: Root) -> Self { | ||
let (mut changes_sender, changes_receiver) = mpsc::channel(0); | ||
changes_sender.try_send(()).unwrap(); | ||
rt.set_state_change_waker(waker(BindStreamWaker::new(changes_sender))); | ||
Self { inner: rt, root, changes_receiver } | ||
} | ||
|
||
/// Returns output of the next revision if at least state change exists | ||
pub fn try_next(&mut self) -> Option<Out> { | ||
let wk = noop_waker(); | ||
let mut cx = FutContext::from_waker(&wk); | ||
match Pin::new(self).poll_next(&mut cx) { | ||
Poll::Ready(out) => out, | ||
Poll::Pending => None, | ||
} | ||
} | ||
|
||
/// Creates a new `Runtime` attached to the provided root function, which | ||
/// then runs once. | ||
pub fn init(root: Root) -> (Self, Out) { | ||
let mut bound = Self::new(root); | ||
let out = bound.try_next().unwrap(); | ||
(bound, out) | ||
} | ||
|
||
/// Binds `Runtime` to the provided root function, then runs it once. | ||
pub fn init_with_runtime(rt: super::Runtime, root: Root) -> (Self, Out) { | ||
let mut bound = Self::with_runtime(rt, root); | ||
let out = bound.try_next().unwrap(); | ||
(bound, out) | ||
} | ||
|
||
/// Returns the runtime's current Revision. | ||
pub fn revision(&self) -> Revision { | ||
self.inner.revision() | ||
} | ||
|
||
/// Sets the executor that will be used to spawn normal priority tasks. | ||
pub fn set_task_executor(&mut self, sp: impl LocalSpawn + 'static) { | ||
self.inner.set_task_executor(sp); | ||
} | ||
|
||
/// Poll this runtime without exiting. Discards any value returned from the | ||
/// root function. The future yields in between revisions and is woken on | ||
/// state changes. | ||
pub async fn run_on_state_changes(mut self) { | ||
loop { | ||
self.next().await; | ||
} | ||
} | ||
|
||
/// Unbinds the runtime from its current root function, returning both. | ||
/// Resets waker to `noop_waker`. | ||
pub fn unbind(self) -> (Runtime, Root) { | ||
let Self { mut inner, root, .. } = self; | ||
inner.set_state_change_waker(noop_waker()); | ||
(inner, root) | ||
} | ||
} | ||
|
||
impl<Root, Out> Stream for BindStream<Root> | ||
where | ||
Root: FnMut() -> Out + Unpin, | ||
{ | ||
type Item = Out; | ||
|
||
/// This `Stream` implementation, if change present, runs a single revision | ||
/// for a call to `poll_next`, returning `Poll::Ready(Some(...))`, otherwise | ||
/// returns `Poll::Pending` | ||
fn poll_next(self: Pin<&mut Self>, cx: &mut FutContext<'_>) -> Poll<Option<Self::Item>> { | ||
let this = self.get_mut(); | ||
Pin::new(&mut this.changes_receiver) | ||
.poll_next(cx) | ||
.map(|received| received.map(|_| this.inner.run_once(&mut this.root))) | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use super::*; | ||
|
||
#[test] | ||
fn pending_without_change() { | ||
use futures::{ | ||
executor::{block_on, LocalPool}, | ||
stream::StreamExt, | ||
task::LocalSpawnExt, | ||
}; | ||
|
||
let mut brt = BindStream::new(|| ()); | ||
block_on(brt.next()).expect("BindStream should yield first revision immediately"); | ||
let mut pool = LocalPool::new(); | ||
pool.spawner() | ||
.spawn_local(async move { | ||
brt.next().await.unwrap(); | ||
unreachable!() | ||
}) | ||
.unwrap(); | ||
assert!(!pool.try_run_one()); | ||
} | ||
|
||
#[test] | ||
fn has_changes() { | ||
use futures::{executor::LocalPool, task::LocalSpawnExt}; | ||
|
||
let (mut brt, key) = BindStream::init(|| crate::state(|| 0).1); | ||
let mut pool = LocalPool::new(); | ||
pool.spawner() | ||
.spawn_local(async move { | ||
brt.next().await; | ||
}) | ||
.unwrap(); | ||
assert!(!pool.try_run_one()); | ||
key.set(0); | ||
assert!(!pool.try_run_one()); | ||
key.set(1); | ||
assert!(pool.try_run_one()); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
use futures::channel::mpsc; | ||
use std::sync::{Arc, Mutex}; | ||
|
||
pub use futures::task::ArcWake; | ||
|
||
pub struct BindStreamWaker { | ||
inner: Mutex<mpsc::Sender<()>>, | ||
} | ||
|
||
impl BindStreamWaker { | ||
pub fn new(inner: mpsc::Sender<()>) -> Arc<Self> { | ||
Arc::new(Self { inner: Mutex::new(inner) }) | ||
} | ||
} | ||
|
||
impl ArcWake for BindStreamWaker { | ||
fn wake_by_ref(arc_self: &Arc<Self>) { | ||
let this = Arc::as_ref(arc_self); | ||
// Ignore error on disconnected or full | ||
drop(this.inner.try_lock().unwrap().try_send(())); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters