Skip to content

Commit

Permalink
Fix filter tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dbrattli committed Nov 8, 2020
1 parent c81ac7d commit 532af6e
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 22 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,5 @@ TestResults/Rx.TE.Tests_log.ldf
.eggs/
.mypy_cache/
.ionide

coverage.xml
10 changes: 7 additions & 3 deletions aioreactive/__init__.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
from . import asyncrx as AsyncRx
from .observables import AsyncAnonymousObservable, AsyncObservable
from .observers import AsyncAnonymousObserver, AsyncIteratorObserver
from .observers import AsyncAnonymousObserver, AsyncAwaitableObserver, AsyncIteratorObserver, AsyncNotificationObserver
from .subject import AsyncSingleSubject, AsyncSubject
from .subscription import run
from .types import AsyncObserver, Stream

__all__ = [
"AsyncRx",
"AsyncObservable",
"AsyncAnonymousObservable",
"AsyncAnonymousObserver",
"AsyncAwaitableObserver",
"AsyncIteratorObserver",
"AsyncNotificationObserver",
"AsyncObservable",
"AsyncObserver",
"AsyncRx",
"AsyncSingleSubject",
"AsyncSubject",
"run",
"Stream",
]
7 changes: 4 additions & 3 deletions aioreactive/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ def filter_async(predicate: Callable[[TSource], Awaitable[bool]]) -> Stream[TSou
Stream[TSource, TSource]: [description]
"""

async def handler(next: Callable[[TResult], Awaitable[None]], x: TSource):
if predicate(x):
async def handler(next: Callable[[TSource], Awaitable[None]], x: TSource):
print("handler: ", x)
if await predicate(x):
return await next(x)

return transform(handler)
Expand All @@ -89,7 +90,7 @@ def filter(predicate: Callable[[TSource], bool]) -> Stream[TSource, TSource]:
Stream[TSource, TSource]: [description]
"""

def handler(next: Callable[[TResult], Awaitable[None]], x: TSource) -> Awaitable[None]:
def handler(next: Callable[[TSource], Awaitable[None]], x: TSource) -> Awaitable[None]:
if predicate(x):
return next(x)
return aio.empty
Expand Down
1 change: 1 addition & 0 deletions aioreactive/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ async def run(
# blocking the last single stream in the chain.
observer_: AsyncObserver[TSource] = observer or AsyncAwaitableObserver()
await source.subscribe_async(observer_)
log.debug("run(): waiting for observer ...")
return await asyncio.wait_for(observer_, timeout)
8 changes: 4 additions & 4 deletions aioreactive/testing/eventloop.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def _run_once(self):
schedules the resulting callbacks, and finally schedules
'call_later' callbacks.
"""
log.debug("run_once()")

sched_count = len(self._scheduled)
if (
Expand All @@ -72,6 +73,8 @@ def _run_once(self):
handle = heapq.heappop(self._scheduled)
handle._scheduled = False

# print("***", self._ready)
# print("***", self._scheduled)
# timeout = None
# if self._ready or self._stopping:
# timeout = 0
Expand All @@ -84,11 +87,8 @@ def _run_once(self):
# self._process_events(event_list)

# Handle 'later' callbacks that are ready.
end_time = self.time() + self._clock_resolution
while self._scheduled:
while self._scheduled and not self._ready:
handle = self._scheduled[0]
if handle._when >= end_time:
break
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
self._time = handle._when
Expand Down
25 changes: 13 additions & 12 deletions test/test_filter.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,45 @@
import pytest
import asyncio
from typing import Generator
from typing import Any, Generator

import pytest
from aioreactive import AsyncAwaitableObserver, asyncrx, run
from aioreactive.testing import VirtualTimeEventLoop
from aioreactive.operators import from_iterable, filter
from aioreactive.core import run, subscribe, AsyncAnonymousObserver, AsyncStream
from expression.core import pipe


class MyException(Exception):
pass


@pytest.yield_fixture()
def event_loop() -> Generator:
@pytest.yield_fixture() # type:ignore
def event_loop() -> Generator[Any, Any, Any]:
loop = VirtualTimeEventLoop()
yield loop
loop.close()


@pytest.mark.asyncio
async def test_filter_happy() -> None:
xs = from_iterable([1, 2, 3])
xs = asyncrx.from_iterable([1, 2, 3])
result = []

async def asend(value: int) -> None:
result.append(value)

async def predicate(value: int) -> bool:
print("sleeping")
await asyncio.sleep(0.1)
return value > 1

ys = filter(predicate, xs)
value = await run(ys, AsyncAnonymousObserver(asend))
ys = pipe(xs, asyncrx.filter_async(predicate))
value = await run(ys, AsyncAwaitableObserver(asend))
assert value == 3
assert result == [2, 3]


@pytest.mark.asyncio
async def test_filter_predicate_throws() -> None:
xs = from_iterable([1, 2, 3])
xs = asyncrx.from_iterable([1, 2, 3])
err = MyException("err")
result = []

Expand All @@ -49,9 +50,9 @@ async def predicate(value: int) -> bool:
await asyncio.sleep(0.1)
raise err

ys = filter(predicate, xs)
ys = pipe(xs, asyncrx.filter_async(predicate))

with pytest.raises(MyException):
await run(ys, AsyncAnonymousObserver(asend))
await run(ys, AsyncAwaitableObserver(asend))

assert result == []

0 comments on commit 532af6e

Please sign in to comment.