Open
Description
I have an example with pyzmq that is leaking around ~60mb/sec of memory. When I run pyzmq with the standard asyncio event loop everything seems to work fine, but using a QEventLoop I see this problem.
import os
import asyncio
import time
import zmq
import zmq.asyncio
import numpy as np
from multiprocessing import Process
from collections import namedtuple
from PyQt5 import QtWidgets
from qasync import QEventLoop
Addr = namedtuple('Addrs', ['name', 'view'])
addr = Addr('graph', 'tcp://127.0.0.1:5557')
def run_worker():
ctx = zmq.Context()
socket = ctx.socket(zmq.PUB)
socket.bind("tcp://127.0.0.1:5557")
timestamp = 0
while True:
try:
topic = 'view:graph:_auto_Projection.0.Out'
socket.send_string(topic, zmq.SNDMORE)
socket.send_pyobj(timestamp, zmq.SNDMORE)
timestamp += 1
msg = np.random.randn(1024, 1024)
socket.send_pyobj(msg)
time.sleep(0.1)
except KeyboardInterrupt:
break
ctx.destroy()
async def update():
ctx = zmq.asyncio.Context()
sock = ctx.socket(zmq.SUB)
sock.setsockopt_string(zmq.SUBSCRIBE, 'view:graph:_auto_Projection.0.Out')
sock.connect(addr.view)
while True:
topic = await sock.recv_string()
heartbeat = await sock.recv_pyobj()
reply = await sock.recv_pyobj()
print("PID:", os.getpid(), "RECEIVED:", reply)
if __name__ == "__main__":
worker = Process(target=run_worker)
worker.start()
app = QtWidgets.QApplication([])
loop = QEventLoop(app)
asyncio.set_event_loop(loop)
task = asyncio.ensure_future(update())
try:
loop.run_forever()
finally:
if not task.done():
task.cancel()
loop.close()
# asyncio.run(update())