You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Hey everyone,
I am building a small app that listens to WebSocket stream and parses the received events. I tried to mock the basic functionality in the code snippet:
import asyncio
import rx
import rx.operators as ops
from rx.scheduler.eventloop import AsyncIOScheduler
import random
TRIGGER_EVENT = 10
def mock_stream(loop):
def on_subscribe(observer, schedular):
async def get_deal_pracentage():
i = 0
while True:
value = random.randint(0, 10000000)
if value == TRIGGER_EVENT:
print("sending data to observable with payload:", i)
observer.on_next(i)
# await asyncio.sleep(0.1)
i += 1
loop.create_task(get_deal_pracentage("test"))
return rx.create(on_subscribe)
async def foo(i):
await asyncio.sleep(4)
return i+1
loop = asyncio.get_event_loop()
ticks = rx.interval(2.0)
obs = mock_stream(loop).pipe(ops.flat_map(lambda i: rx.from_future(
loop.create_task(foo()))))
obs.subscribe(
on_next=lambda item: print("observer received payload:", item),
scheduler=AsyncIOScheduler(loop)
)
loop.run_forever()
For some reason, the observer is not receiving the emitted item. When I am adding await asyncio.sleep() in the observable the observer would receive the event.
How can the observer receive the item without adding the await asyncio.sleep()? and why it only works when I am adding the await asyncio.sleep()?
I am pretty new to rxpy and asyncio, so unrelated feedback about the code would be highly welcomed :)
The text was updated successfully, but these errors were encountered:
The issue here is that the asyncio event loop is blocked forever.
The while loop in get_deal_pracentage runs on the asyncio event loop of your application. Without the sleep call, the get_deal_pracentage task never exits nor leaves execution slots for other tasks. As a consequence, the rest of the code is never scheduled/executed.
With the call to sleep, get_deal_pracentage is interrupted. Asyncio schedules one of the other pending tasks, including the code in flatmap.
If you are new to both rxpy and asyncio, starting by using them together is not an easy learning path!
In my actual implementation, I'm receiving 3 events one after the other. When I am using asyncio.wait I am losing some data(only one event is passing to the observer).
Is there any other way to wait for an event and also make sure that the rest of the application would run?
Hey everyone,
I am building a small app that listens to WebSocket stream and parses the received events. I tried to mock the basic functionality in the code snippet:
For some reason, the observer is not receiving the emitted item. When I am adding
await asyncio.sleep()
in the observable the observer would receive the event.How can the observer receive the item without adding the
await asyncio.sleep()
? and why it only works when I am adding theawait asyncio.sleep()
?I am pretty new to
rxpy
andasyncio
, so unrelated feedback about the code would be highly welcomed :)The text was updated successfully, but these errors were encountered: