Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Added delay. Fixed timefiles example
  • Loading branch information
dbrattli committed Nov 8, 2020
1 parent 532af6e commit bd2a48b
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 43 deletions.
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[flake8]
ignore = E731, T484, T400 # Do not assign a lambda expression, use a def
ignore = E731, T484, T400, W503
max-line-length = 120
4 changes: 2 additions & 2 deletions aioreactive/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from . import asyncrx as AsyncRx
from . import asyncrx
from .observables import AsyncAnonymousObservable, AsyncObservable
from .observers import AsyncAnonymousObserver, AsyncAwaitableObserver, AsyncIteratorObserver, AsyncNotificationObserver
from .subject import AsyncSingleSubject, AsyncSubject
Expand All @@ -13,7 +13,7 @@
"AsyncNotificationObserver",
"AsyncObservable",
"AsyncObserver",
"AsyncRx",
"asyncrx",
"AsyncSingleSubject",
"AsyncSubject",
"run",
Expand Down
6 changes: 3 additions & 3 deletions aioreactive/asyncrx.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ def catch(handler: Callable[[Exception], AsyncObservable[TSource]]) -> Stream[TS
return catch(handler)


def delay(seconds: float) -> Callable[[AsyncObservable[TSource]], AsyncObservable[TSource]]:
from aioreactive.operators.delay import delay
def delay(seconds: float) -> Stream[TSource, TSource]:
from .timeshift import delay

return partial(delay, seconds)
return delay(seconds)


def filter(predicate: Callable[[TSource], bool]) -> Callable[[AsyncObservable[TSource]], AsyncObservable[TSource]]:
Expand Down
5 changes: 3 additions & 2 deletions aioreactive/observers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from asyncio import Future, iscoroutinefunction
from typing import AsyncIterable, AsyncIterator, Awaitable, Callable, List, Optional, Tuple, TypeVar
from typing import (AsyncIterable, AsyncIterator, Awaitable, Callable, List,
Optional, Tuple, TypeVar)

from expression.core import MailboxProcessor
from expression.system import AsyncDisposable, Disposable
Expand Down Expand Up @@ -121,7 +122,7 @@ class AsyncNotificationObserver(AsyncObserver[TSource]):
def __init__(self, fn: Callable[[Notification], Awaitable[None]]) -> None:
self._fn = fn

async def asent(self, value: TSource) -> None:
async def asend(self, value: TSource) -> None:
await self._fn(OnNext(value))

async def athrow(self, error: Exception) -> None:
Expand Down
8 changes: 6 additions & 2 deletions aioreactive/testing/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
from .eventloop import VirtualTimeEventLoop
"""Testing module.
Contains utilities for unit testing async observables.
"""
from .observer import AsyncTestObserver
from .subject import AsyncSingleSubject, AsyncSubject
from .virtual_events import VirtualTimeEventLoop

__all__ = ["VirtualTimeEventLoop", "AsyncAnonymousObserver", "AsyncSingleSubject", "AsyncSubject"]
__all__ = ["VirtualTimeEventLoop", "AsyncTestObserver", "AsyncSingleSubject", "AsyncSubject"]
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ def _format_handle(handle):


class VirtualTimeEventLoop(asyncio.BaseEventLoop):
"""Virtual time event loop.
Works the same was as a normal event loop except that time is
virtual. Time starts at 0 and only advances when something is
scheduled into the future. Thus the event loops runs as quickly as
possible while producing the same output as a normal event loop
would do. This makes it ideal for unit-testing where you want to
test delays but without spending time in real life.
"""

def __init__(self) -> None:
super().__init__()

Expand All @@ -42,10 +52,10 @@ def time(self):
return self._time

def _run_once(self):
"""Run one full iteration of the event loop.
This calls all currently ready callbacks, polls for I/O,
schedules the resulting callbacks, and finally schedules
'call_later' callbacks.
"""Run one full iteration of the event loop. This calls all
currently ready callbacks, polls for I/O, schedules the
resulting callbacks, and finally schedules 'call_later'
callbacks.
"""
log.debug("run_once()")

Expand Down Expand Up @@ -75,16 +85,6 @@ def _run_once(self):

# print("***", self._ready)
# print("***", self._scheduled)
# timeout = None
# if self._ready or self._stopping:
# timeout = 0
# elif self._scheduled:
# Compute the desired timeout.
# when = self._scheduled[0]._when
# timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)

# event_list = self._selector.select(timeout)
# self._process_events(event_list)

# Handle 'later' callbacks that are ready.
while self._scheduled and not self._ready:
Expand All @@ -95,11 +95,11 @@ def _run_once(self):
self._ready.append(handle)

# This is the only place where callbacks are actually *called*.
# All other places just add them to ready.
# Note: We run all currently scheduled callbacks, but not any
# callbacks scheduled by callbacks run this time around --
# they will be run the next time (after another I/O poll).
# Use an idiom that is thread-safe without using locks.
# All other places just add them to ready. Note: We run all
# currently scheduled callbacks, but not any callbacks scheduled
# by callbacks run this time around -- they will be run the next
# time (after another I/O poll). Use an idiom that is
# thread-safe without using locks.
ntodo = len(self._ready)
for i in range(ntodo):
handle = self._ready.popleft()
Expand Down
72 changes: 72 additions & 0 deletions aioreactive/timeshift.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import asyncio
from datetime import datetime, timedelta
from typing import Tuple, TypeVar

from expression.core import MailboxProcessor, pipe
from expression.system import CancellationTokenSource

from .notification import Notification, OnError, OnNext
from .observables import AsyncAnonymousObservable
from .observers import AsyncAnonymousObserver, AsyncNotificationObserver
from .types import AsyncDisposable, AsyncObservable, AsyncObserver, Stream

TSource = TypeVar("TSource")


def delay(seconds: float) -> Stream[TSource, TSource]:
"""Delay observable.
Time shifts the observable sequence by the given timeout. The
relative time intervals between the values are preserved.
Args:
seconds (float): Number of seconds to delay.
Returns:
Stream[TSource, TSource]: Delayed stream.
"""

def _delay(source: AsyncObservable[TSource]) -> AsyncObservable[TSource]:
cts = CancellationTokenSource()

async def subscribe_async(aobv: AsyncObserver[TSource]) -> AsyncDisposable:
async def worker(inbox: MailboxProcessor[Tuple[Notification, datetime]]) -> None:
async def message_loop() -> None:
ns, due_time = await inbox.receive()

diff = due_time - datetime.utcnow()
seconds = diff.total_seconds()
if seconds > 0:
await asyncio.sleep(seconds)

if isinstance(ns, OnNext):
x = ns.value
await aobv.asend(x)
elif isinstance(ns, OnError):
err = ns.exception
await aobv.athrow(err)
else:
await aobv.aclose()

await message_loop()

await message_loop()

agent = MailboxProcessor.start(worker, cts.token)

async def fn(ns: Notification) -> None:
due_time = datetime.utcnow() + timedelta(seconds=seconds)
agent.post((ns, due_time))

obv: AsyncNotificationObserver[TSource] = AsyncNotificationObserver(fn)
subscription = await pipe(obv, source.subscribe_async)

async def cancel() -> None:
cts.cancel()
await subscription.dispose_async()

return AsyncDisposable.create(cancel)

return AsyncAnonymousObservable(subscribe_async)

return _delay
26 changes: 12 additions & 14 deletions examples/timeflies/timeflies.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,37 @@
import asyncio
from tkinter import Label, Frame, Tk
from tkinter import Event, Frame, Label, Tk

from aioreactive.core import AsyncAnonymousObserver
from aioreactive.core import AsyncStream
from aioreactive.operators.pipe import delay
from aioreactive import AsyncAnonymousObserver, AsyncSubject, asyncrx
from expression.core import pipe


async def main(loop) -> None:
async def main() -> None:
root = Tk()
root.title("aioreactive")

mousemoves = AsyncStream()
mousemoves: AsyncSubject[Event[Frame]] = AsyncSubject()

frame = Frame(root, width=800, height=600)

async def move(event) -> None:
async def move(event: Event) -> None:
await mousemoves.asend(event)

def call_move(event):
def motion(event: Event) -> None:
asyncio.ensure_future(move(event))

frame.bind("<Motion>", call_move)
frame.bind("<Motion>", motion)

text = "TIME FLIES LIKE AN ARROW"
labels = [Label(frame, text=c) for c in text]

async def handle_label(i, label) -> None:
async def handle_label(i: int, label: Label) -> None:
label.config(dict(borderwidth=0, padx=0, pady=0))

async def asend(ev) -> None:
async def asend(ev: Event) -> None:
label.place(x=ev.x + i * 12 + 15, y=ev.y)

obv = AsyncAnonymousObserver(asend)

await (mousemoves | delay(i / 10.0) > obv)
await pipe(mousemoves, asyncrx.delay(i / 10.0)).subscribe_async(obv)

for i, label in enumerate(labels):
await handle_label(i, label)
Expand All @@ -48,5 +46,5 @@ async def asend(ev) -> None:

if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.run_until_complete(main())
loop.close()

0 comments on commit bd2a48b

Please sign in to comment.