diff --git a/.flake8 b/.flake8 index 750fd3b..99f3985 100644 --- a/.flake8 +++ b/.flake8 @@ -1,3 +1,3 @@ [flake8] -ignore = E731, T484, T400 # Do not assign a lambda expression, use a def +ignore = E731, T484, T400, W503 max-line-length = 120 \ No newline at end of file diff --git a/aioreactive/__init__.py b/aioreactive/__init__.py index e145353..6b412cd 100644 --- a/aioreactive/__init__.py +++ b/aioreactive/__init__.py @@ -1,4 +1,4 @@ -from . import asyncrx as AsyncRx +from . import asyncrx from .observables import AsyncAnonymousObservable, AsyncObservable from .observers import AsyncAnonymousObserver, AsyncAwaitableObserver, AsyncIteratorObserver, AsyncNotificationObserver from .subject import AsyncSingleSubject, AsyncSubject @@ -13,7 +13,7 @@ "AsyncNotificationObserver", "AsyncObservable", "AsyncObserver", - "AsyncRx", + "asyncrx", "AsyncSingleSubject", "AsyncSubject", "run", diff --git a/aioreactive/asyncrx.py b/aioreactive/asyncrx.py index ce9fd8f..2186180 100644 --- a/aioreactive/asyncrx.py +++ b/aioreactive/asyncrx.py @@ -50,10 +50,10 @@ def catch(handler: Callable[[Exception], AsyncObservable[TSource]]) -> Stream[TS return catch(handler) -def delay(seconds: float) -> Callable[[AsyncObservable[TSource]], AsyncObservable[TSource]]: - from aioreactive.operators.delay import delay +def delay(seconds: float) -> Stream[TSource, TSource]: + from .timeshift import delay - return partial(delay, seconds) + return delay(seconds) def filter(predicate: Callable[[TSource], bool]) -> Callable[[AsyncObservable[TSource]], AsyncObservable[TSource]]: diff --git a/aioreactive/observers.py b/aioreactive/observers.py index 379e103..5abb2b7 100644 --- a/aioreactive/observers.py +++ b/aioreactive/observers.py @@ -1,6 +1,7 @@ import logging from asyncio import Future, iscoroutinefunction -from typing import AsyncIterable, AsyncIterator, Awaitable, Callable, List, Optional, Tuple, TypeVar +from typing import (AsyncIterable, AsyncIterator, Awaitable, Callable, List, + Optional, Tuple, TypeVar) from expression.core import MailboxProcessor from expression.system import AsyncDisposable, Disposable @@ -121,7 +122,7 @@ class AsyncNotificationObserver(AsyncObserver[TSource]): def __init__(self, fn: Callable[[Notification], Awaitable[None]]) -> None: self._fn = fn - async def asent(self, value: TSource) -> None: + async def asend(self, value: TSource) -> None: await self._fn(OnNext(value)) async def athrow(self, error: Exception) -> None: diff --git a/aioreactive/testing/__init__.py b/aioreactive/testing/__init__.py index df1b75e..4c6948d 100644 --- a/aioreactive/testing/__init__.py +++ b/aioreactive/testing/__init__.py @@ -1,5 +1,9 @@ -from .eventloop import VirtualTimeEventLoop +"""Testing module. + +Contains utilities for unit testing async observables. +""" from .observer import AsyncTestObserver from .subject import AsyncSingleSubject, AsyncSubject +from .virtual_events import VirtualTimeEventLoop -__all__ = ["VirtualTimeEventLoop", "AsyncAnonymousObserver", "AsyncSingleSubject", "AsyncSubject"] +__all__ = ["VirtualTimeEventLoop", "AsyncTestObserver", "AsyncSingleSubject", "AsyncSubject"] diff --git a/aioreactive/testing/eventloop.py b/aioreactive/testing/virtual_events.py similarity index 77% rename from aioreactive/testing/eventloop.py rename to aioreactive/testing/virtual_events.py index b4ece1f..01ca731 100644 --- a/aioreactive/testing/eventloop.py +++ b/aioreactive/testing/virtual_events.py @@ -29,6 +29,16 @@ def _format_handle(handle): class VirtualTimeEventLoop(asyncio.BaseEventLoop): + """Virtual time event loop. + + Works the same was as a normal event loop except that time is + virtual. Time starts at 0 and only advances when something is + scheduled into the future. Thus the event loops runs as quickly as + possible while producing the same output as a normal event loop + would do. This makes it ideal for unit-testing where you want to + test delays but without spending time in real life. + """ + def __init__(self) -> None: super().__init__() @@ -42,10 +52,10 @@ def time(self): return self._time def _run_once(self): - """Run one full iteration of the event loop. - This calls all currently ready callbacks, polls for I/O, - schedules the resulting callbacks, and finally schedules - 'call_later' callbacks. + """Run one full iteration of the event loop. This calls all + currently ready callbacks, polls for I/O, schedules the + resulting callbacks, and finally schedules 'call_later' + callbacks. """ log.debug("run_once()") @@ -75,16 +85,6 @@ def _run_once(self): # print("***", self._ready) # print("***", self._scheduled) - # timeout = None - # if self._ready or self._stopping: - # timeout = 0 - # elif self._scheduled: - # Compute the desired timeout. - # when = self._scheduled[0]._when - # timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT) - - # event_list = self._selector.select(timeout) - # self._process_events(event_list) # Handle 'later' callbacks that are ready. while self._scheduled and not self._ready: @@ -95,11 +95,11 @@ def _run_once(self): self._ready.append(handle) # This is the only place where callbacks are actually *called*. - # All other places just add them to ready. - # Note: We run all currently scheduled callbacks, but not any - # callbacks scheduled by callbacks run this time around -- - # they will be run the next time (after another I/O poll). - # Use an idiom that is thread-safe without using locks. + # All other places just add them to ready. Note: We run all + # currently scheduled callbacks, but not any callbacks scheduled + # by callbacks run this time around -- they will be run the next + # time (after another I/O poll). Use an idiom that is + # thread-safe without using locks. ntodo = len(self._ready) for i in range(ntodo): handle = self._ready.popleft() diff --git a/aioreactive/timeshift.py b/aioreactive/timeshift.py new file mode 100644 index 0000000..853dffb --- /dev/null +++ b/aioreactive/timeshift.py @@ -0,0 +1,72 @@ +import asyncio +from datetime import datetime, timedelta +from typing import Tuple, TypeVar + +from expression.core import MailboxProcessor, pipe +from expression.system import CancellationTokenSource + +from .notification import Notification, OnError, OnNext +from .observables import AsyncAnonymousObservable +from .observers import AsyncAnonymousObserver, AsyncNotificationObserver +from .types import AsyncDisposable, AsyncObservable, AsyncObserver, Stream + +TSource = TypeVar("TSource") + + +def delay(seconds: float) -> Stream[TSource, TSource]: + """Delay observable. + + Time shifts the observable sequence by the given timeout. The + relative time intervals between the values are preserved. + + Args: + seconds (float): Number of seconds to delay. + + Returns: + Stream[TSource, TSource]: Delayed stream. + """ + + def _delay(source: AsyncObservable[TSource]) -> AsyncObservable[TSource]: + cts = CancellationTokenSource() + + async def subscribe_async(aobv: AsyncObserver[TSource]) -> AsyncDisposable: + async def worker(inbox: MailboxProcessor[Tuple[Notification, datetime]]) -> None: + async def message_loop() -> None: + ns, due_time = await inbox.receive() + + diff = due_time - datetime.utcnow() + seconds = diff.total_seconds() + if seconds > 0: + await asyncio.sleep(seconds) + + if isinstance(ns, OnNext): + x = ns.value + await aobv.asend(x) + elif isinstance(ns, OnError): + err = ns.exception + await aobv.athrow(err) + else: + await aobv.aclose() + + await message_loop() + + await message_loop() + + agent = MailboxProcessor.start(worker, cts.token) + + async def fn(ns: Notification) -> None: + due_time = datetime.utcnow() + timedelta(seconds=seconds) + agent.post((ns, due_time)) + + obv: AsyncNotificationObserver[TSource] = AsyncNotificationObserver(fn) + subscription = await pipe(obv, source.subscribe_async) + + async def cancel() -> None: + cts.cancel() + await subscription.dispose_async() + + return AsyncDisposable.create(cancel) + + return AsyncAnonymousObservable(subscribe_async) + + return _delay diff --git a/examples/timeflies/timeflies.py b/examples/timeflies/timeflies.py index 4b80071..b16d3b7 100644 --- a/examples/timeflies/timeflies.py +++ b/examples/timeflies/timeflies.py @@ -1,39 +1,37 @@ import asyncio -from tkinter import Label, Frame, Tk +from tkinter import Event, Frame, Label, Tk -from aioreactive.core import AsyncAnonymousObserver -from aioreactive.core import AsyncStream -from aioreactive.operators.pipe import delay +from aioreactive import AsyncAnonymousObserver, AsyncSubject, asyncrx +from expression.core import pipe -async def main(loop) -> None: +async def main() -> None: root = Tk() root.title("aioreactive") - mousemoves = AsyncStream() + mousemoves: AsyncSubject[Event[Frame]] = AsyncSubject() frame = Frame(root, width=800, height=600) - async def move(event) -> None: + async def move(event: Event) -> None: await mousemoves.asend(event) - def call_move(event): + def motion(event: Event) -> None: asyncio.ensure_future(move(event)) - frame.bind("", call_move) + frame.bind("", motion) text = "TIME FLIES LIKE AN ARROW" labels = [Label(frame, text=c) for c in text] - async def handle_label(i, label) -> None: + async def handle_label(i: int, label: Label) -> None: label.config(dict(borderwidth=0, padx=0, pady=0)) - async def asend(ev) -> None: + async def asend(ev: Event) -> None: label.place(x=ev.x + i * 12 + 15, y=ev.y) obv = AsyncAnonymousObserver(asend) - - await (mousemoves | delay(i / 10.0) > obv) + await pipe(mousemoves, asyncrx.delay(i / 10.0)).subscribe_async(obv) for i, label in enumerate(labels): await handle_label(i, label) @@ -48,5 +46,5 @@ async def asend(ev) -> None: if __name__ == "__main__": loop = asyncio.get_event_loop() - loop.run_until_complete(main(loop)) + loop.run_until_complete(main()) loop.close()