diff --git a/.flake8 b/.flake8 index 103ea2f..26d8cff 100644 --- a/.flake8 +++ b/.flake8 @@ -1,7 +1,7 @@ [flake8] max-line-length = 80 ignore = - E701 + E701, E131, W504 exclude = .git, .venv, diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 0eb1cd0..7265c54 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -57,4 +57,4 @@ jobs: - name: Run pytest shell: bash - run: python -m pytest -p no:sugar tests/ + run: python -m pytest -p no:sugar --log-cli-level=DEBUG --capture=tee-sys tests diff --git a/README.md b/README.md index 982fd60..a31b200 100644 --- a/README.md +++ b/README.md @@ -29,58 +29,109 @@ from zmq_tubes.threads import TubeNode, Tube # Threads classes ## Usage: ### Node definitions in yml file -We can define all tubes for one TubeNode by yml file. - +We can define all tubes for one TubeNode by yml file. +Next examples require install these packages `PyYAML`, `pyzmq` and `zmq_tubes`. +#### Client service (asyncio example) ```yaml -# test.yml +# client.yml tubes: - name: Client REQ - addr: ipc:///tmp/req.pipe + addr: ipc:///tmp/req.pipe tube_type: REQ topics: - - foo/# - - +/bar + - foo/bar - name: Client PUB - addr: ipc:///tmp/pub.pipe + addr: ipc:///tmp/pub.pipe tube_type: PUB topics: - foo/pub/# +``` + +```python +# client.py +import asyncio +import yaml +from zmq_tubes import TubeNode, TubeMessage + + +async def run(): + with open('client.yml', 'r+') as fd: + schema = yaml.safe_load(fd) + node = TubeNode(schema=schema) + async with node: + print(await node.request('foo/bar', 'message 1')) + await node.publish('foo/pub/test', 'message 2') - - name: Server ROUTER - addr: ipc:///tmp/router.pipe +if __name__ == "__main__": + asyncio.run(run()) +``` +```shell +> python client.py +topic: foo/bar, payload: response +``` + + +#### Server service (threads example) +```yaml +# server.yml +tubes: + - name: server ROUTER + addr: ipc:///tmp/req.pipe tube_type: ROUTER - server: yes - sockopts: - LINGER: 0 + server: True + topics: + - foo/bar + + - name: server SUB + addr: ipc:///tmp/pub.pipe + tube_type: SUB + server: True topics: - - server/# + - foo/pub/# ``` ```python -import asyncio +# server.py import yaml -from zmq_tubes import TubeNode, TubeMessage +from zmq_tubes.threads import TubeNode, TubeMessage -async def handler(request: TubeMessage): +def handler(request: TubeMessage): print(request.payload) - return request.create_response('response') + if request.tube.tube_type_name == 'ROUTER': + return request.create_response('response') -async def run(): - with open('test.yml', 'r+') as fd: +def run(): + with open('server.yml', 'r+') as fd: schema = yaml.safe_load(fd) node = TubeNode(schema=schema) - node.register_handler('server/#', handler) - async with node: - await node.publish('foo/pub/test', 'message 1') - print(await node.request('foo/xxx', 'message 2')) + node.register_handler('foo/#', handler) + with node: + node.start().join() + +if __name__ == "__main__": + run() +``` -asyncio.run(run()) +```shell +> python server.py +message 1 +message 2 ``` +### YAML definition +The yaml file starts with a root element `tubes`, which contains list of all our tube definitions. +- `name` - string - name of the tube. +- `addr` - string - connection or bind address in format `transport://address` (see more http://api.zeromq.org/2-1:zmq-connect) +- `server` - bool - is this tube server side (bind to `addr`) or client side (connect to `addr`) +- `tube_type` - string - type of this tube (see more https://zguide.zeromq.org/docs/chapter2/#Messaging-Patterns) +- `identity` - string - (optional) we can setup custom tube identity +- `utf8_decoding` - bool - (default = True), if this is True, the payload is automatically UTF8 decode. +- `sockopts` - dict - (optional) we can setup sockopts for this tube (see more http://api.zeromq.org/4-2:zmq-setsockopt) +- `monitor` - string - (optional) bind address of tube monitor (see more [Debugging / Monitoring](#debugging-/-monitoring)) ### Request / Response @@ -113,7 +164,6 @@ await node.start() ``` #### Client: - ```python from zmq_tubes import Tube, TubeNode @@ -129,8 +179,8 @@ response = await node.request('test/xxx', 'question') print(response.payload) # output: 'answer' ``` - - +The method `request` accepts the optional parameter `utf8_decoding`. When we set this parameter to `False` in previous +example, the returned payload is not automatically decoded, we get bytes. ### Subscribe / Publisher diff --git a/tests/async/test_dealer_dealer.py b/tests/async/test_dealer_dealer.py index 3191bce..1ab1c61 100644 --- a/tests/async/test_dealer_dealer.py +++ b/tests/async/test_dealer_dealer.py @@ -1,10 +1,9 @@ import sys -import asyncio - import zmq import pytest +from tests.helpers import wait_for_result from zmq_tubes import Tube, TubeNode pytestmark = pytest.mark.skipif(sys.version_info < (3, 7), @@ -16,20 +15,28 @@ @pytest.fixture def data(): - return ['REQ10', 'REQ11', 'REQ20', 'REQ21'] + return ['REQ10', 'REQ11', 'REQ20', 'REQ21'].copy() + + +@pytest.fixture +def result(): + return [] -@pytest.fixture(params=[{'server': True}]) -def dealer_node1(request, data): +@pytest.fixture(params=[{'server': True, 'utf8_decoding': True}]) +def dealer_node1(request, result): async def __process(req): - data.remove(req.payload) - await req.tube.send(req.create_response(f'DEALER1-{req.payload[-2:]}')) + result.append(req.payload) + await req.tube.send(req.create_response( + f'DEALER1{req.payload[-2:]}' if request.param['utf8_decoding'] + else b'DEALER1' + req.payload[-2:])) tube = Tube( name='DEALER1', addr=ADDR, server=request.param['server'], - tube_type=zmq.DEALER + tube_type=zmq.DEALER, + utf8_decoding=request.param['utf8_decoding'] ) node = TubeNode() @@ -38,13 +45,14 @@ async def __process(req): return node -@pytest.fixture(params=[{'server': False}]) +@pytest.fixture(params=[{'server': False, 'utf8_decoding': True}]) def dealer_node2(request): tube = Tube( name='DEALER2', addr=ADDR, server=request.param['server'], - tube_type=zmq.DEALER + tube_type=zmq.DEALER, + utf8_decoding=request.param['utf8_decoding'] ) node = TubeNode() @@ -52,34 +60,37 @@ def dealer_node2(request): return node +################################################################################ +# Tests +################################################################################ + + @pytest.mark.asyncio -async def test_dealer_dealer(dealer_node1, dealer_node2, data): +async def test_dealer_dealer(dealer_node1, dealer_node2, data, result): res = [] async def __process(req): res.append(req.payload) - dealer_node2.register_handler(f"{TOPIC}/#", __process) - + result.clear() async with dealer_node1, dealer_node2: - for _ in range(len(data)): - await dealer_node2.send(f"{TOPIC}/A", data[0]) - await asyncio.sleep(.2) - await asyncio.sleep(1) - - assert len(res) == 4 - assert len(data) == 0 + while data: + await dealer_node2.send(f"{TOPIC}/A", data.pop()) + assert await wait_for_result( + lambda: len(res) == 4 and len(result) == 4, + timeout=1 + ) @pytest.mark.asyncio -async def test_dealer_dealer_on_same_node(dealer_node1, data): +async def test_dealer_dealer_on_same_node(dealer_node1, data, result): res = [] async def __process(req): res.append(req.payload) - + result.clear() tube = Tube( - name='DEALER1', + name='DEALER2', addr=ADDR, server=False, tube_type=zmq.DEALER @@ -89,10 +100,31 @@ async def __process(req): dealer_node1.register_handler(f"{TOPIC}/#", __process, tube) async with dealer_node1: - for _ in range(len(data)): - await dealer_node1.send(f"{TOPIC}/A", data[0]) - await asyncio.sleep(.2) - await asyncio.sleep(1) + while data: + await dealer_node1.send(f"{TOPIC}/A", data.pop()) + assert await wait_for_result( + lambda: len(res) == 4 and len(result) == 4, + timeout=1 + ) + + +@pytest.mark.asyncio +@pytest.mark.parametrize("dealer_node1,dealer_node2", + [({'server': True, 'utf8_decoding': False}, + {'server': False, 'utf8_decoding': False})], + indirect=["dealer_node1", "dealer_node2"]) +async def test_dealer_reps_bytes(dealer_node1, dealer_node2, result): + res = [] - assert len(res) == 4 - assert len(data) == 0 + async def __process(req): + res.append(req.payload) + dealer_node2.register_handler(f"{TOPIC}/#", __process) + + result.clear() + async with dealer_node1, dealer_node2: + await dealer_node2.send(f"{TOPIC}/A", 'XXX') + assert await wait_for_result( + lambda: len(res) == 1 and isinstance(res[0], bytes) and + len(result) == 1 and isinstance(result[0], bytes), + timeout=1 + ) diff --git a/tests/async/test_dealer_rep.py b/tests/async/test_dealer_rep.py index d19591e..6e1b626 100644 --- a/tests/async/test_dealer_rep.py +++ b/tests/async/test_dealer_rep.py @@ -1,10 +1,8 @@ import sys - -import asyncio - import zmq import pytest +from tests.helpers import wait_for_result from zmq_tubes import Tube, TubeNode pytestmark = pytest.mark.skipif(sys.version_info < (3, 7), @@ -16,21 +14,32 @@ @pytest.fixture def data(): - return ['REQ10', 'REQ11'] + return ['REQ10', 'REQ11'].copy() @pytest.fixture def data2(): - return ['REQ20', 'REQ21'] + return ['REQ20', 'REQ21'].copy() + + +@pytest.fixture +def result(): + return [] + + +@pytest.fixture +def result2(): + return [] -@pytest.fixture(params=[{'server': True}]) +@pytest.fixture(params=[{'server': True, 'utf8_decoding': True}]) def dealer_node(request): tube = Tube( name='DEALER1', addr=ADDR, server=request.param['server'], - tube_type=zmq.DEALER + tube_type=zmq.DEALER, + utf8_decoding=request.param['utf8_decoding'] ) node = TubeNode() @@ -38,18 +47,20 @@ def dealer_node(request): return node -@pytest.fixture(params=[{'server': False}]) -def resp_node1(data, request): +@pytest.fixture(params=[{'server': False, 'utf8_decoding': True}]) +def resp_node1(result, request): async def __process(req): - if req.payload in data: - data.remove(req.payload) - return req.create_response(f'RESP1-{req.payload[-2:]}') + result.append(req.payload) + return req.create_response( + f'RESP1{req.payload[-2:]}' if request.param['utf8_decoding'] + else b'RESP1' + req.payload[-2:]) tube = Tube( name='RESP1', addr=ADDR, server=request.param['server'], - tube_type=zmq.REP + tube_type=zmq.REP, + utf8_decoding=request.param['utf8_decoding'] ) node = TubeNode() @@ -58,18 +69,21 @@ async def __process(req): return node -@pytest.fixture(params=[{'server': False}]) -def resp_node2(data2, request): +@pytest.fixture(params=[{'server': False, 'utf8_decoding': True}]) +def resp_node2(result2, request): async def __process(req): - if req.payload in data2: - data2.remove(req.payload) - return req.create_response(f'RESP2-{req.payload[-2:]}') + result2.append(req.payload) + return req.create_response( + f'RESP2{req.payload[-2:]}' if request.param['utf8_decoding'] + else b'RESP2' + req.payload[-2:] + ) tube = Tube( name='RESP2', addr=ADDR, server=request.param['server'], - tube_type=zmq.REP + tube_type=zmq.REP, + utf8_decoding=request.param['utf8_decoding'] ) node = TubeNode() @@ -78,48 +92,46 @@ async def __process(req): return node -# @pytest.mark.asyncio -# async def test_dealer_reps(dealer_node, resp_node1, resp_node2, data, data2): -# res = [] -# -# async def __process(req): -# res.append(req.payload) -# -# dealer_node.register_handler(f"{TOPIC}/#", __process, -# dealer_node.get_tube_by_name('DEALER1')) -# -# async with dealer_node, resp_node1, resp_node2: -# _d1 = data.copy() -# _d2 = data2.copy() -# for _ in range(len(data)): -# await dealer_node.send(f"{TOPIC}/A", _d1.pop()) -# await asyncio.sleep(0.1) -# await dealer_node.send(f"{TOPIC}/B", _d2.pop()) -# await asyncio.sleep(0.1) -# for _ in range(200): -# # We have to wait, before close nodes. -# if len(res) == 4: -# break -# await asyncio.sleep(0.2) -# -# assert len(res) == 4 -# assert len(data) == 0 -# assert len(data2) == 0 - +################################################################################ +# Tests +################################################################################ @pytest.mark.asyncio -async def test_dealer_reps_on_same_node(dealer_node, data): +async def test_dealer_reps(dealer_node, resp_node1, resp_node2, data, data2, + result, result2): + """ + One dealer send request to two resp servers + """ res = [] async def __process(req): res.append(req.payload) + dealer_node.register_handler(f"{TOPIC}/#", __process) + result.clear() + result2.clear() + async with dealer_node, resp_node1, resp_node2: + while data: + await dealer_node.send(f"{TOPIC}/A", data.pop()) + await dealer_node.send(f"{TOPIC}/B", data2.pop()) + assert await wait_for_result( + lambda: len(res) == 4 and len(result) == 2 and len(result2) == 2, + timeout=3 + ) + +@pytest.mark.asyncio +async def test_dealer_reps_on_same_node(dealer_node, data, result): + res = [] + + async def __process(req): + res.append(req.payload) dealer_node.register_handler(f"{TOPIC}/#", __process, dealer_node.tubes[0]) async def __process_resp(req): - data.remove(req.payload) + result.append(req.payload) return req.create_response(f'RESP-{req.payload[-2:]}') + result.clear() tube = Tube( name='RESP', addr=ADDR, @@ -130,10 +142,30 @@ async def __process_resp(req): dealer_node.register_handler(f"{TOPIC}/#", __process_resp, tube) async with dealer_node: - for _ in range(len(data)): - await dealer_node.send(f"{TOPIC}/A", data[0]) - await asyncio.sleep(.1) - await asyncio.sleep(1) + while data: + await dealer_node.send(f"{TOPIC}/A", data.pop()) + assert await wait_for_result( + lambda: len(res) == 2 and len(result) == 2, + timeout=3 + ) + - assert len(res) == 2 - assert len(data) == 0 +@pytest.mark.asyncio +@pytest.mark.parametrize("dealer_node,resp_node1", + [({'server': True, 'utf8_decoding': False}, + {'server': False, 'utf8_decoding': False})], + indirect=["dealer_node", "resp_node1"]) +async def test_dealer_reps_bytes(dealer_node, resp_node1, result): + res = [] + + async def __process(req): + res.append(req.payload) + dealer_node.register_handler(f"{TOPIC}/#", __process) + result.clear() + async with dealer_node, resp_node1: + await dealer_node.send(f"{TOPIC}/A", 'XXX') + assert await wait_for_result( + lambda: len(res) == 1 and isinstance(res[0], bytes) and + len(result) == 1 and isinstance(result[0], bytes), + timeout=3 + ) diff --git a/tests/async/test_dealer_router.py b/tests/async/test_dealer_router.py index 21a52b5..2ed230f 100644 --- a/tests/async/test_dealer_router.py +++ b/tests/async/test_dealer_router.py @@ -5,6 +5,7 @@ import zmq import pytest +from tests.helpers import wait_for_result from zmq_tubes import Tube, TubeNode pytestmark = pytest.mark.skipif(sys.version_info < (3, 7), @@ -16,22 +17,30 @@ @pytest.fixture def data(): - return ['REQ10', 'REQ11', 'REQ20', 'REQ21'] + return ['REQ10', 'REQ11', 'REQ20', 'REQ21'].copy() -@pytest.fixture(params=[{'server': True}]) -def router_node(data, request): +@pytest.fixture +def result(): + return [] + + +@pytest.fixture(params=[{'server': True, 'utf8_decoding': True}]) +def router_node(result, request): async def __process(req): - data.remove(req.payload) - if 'REQ10' in req.payload: + result.append(req.payload) + if isinstance(req.payload, str) and 'REQ10' in req.payload: await asyncio.sleep(.3) - return req.create_response(f'RESP{req.payload[-2:]}') + return req.create_response( + f'RESP1{req.payload[-2:]}' if request.param['utf8_decoding'] + else b'RESP1' + req.payload[-2:]) tube = Tube( name='ROUTER', addr=ADDR, server=request.param['server'], - tube_type=zmq.ROUTER + tube_type=zmq.ROUTER, + utf8_decoding=request.param['utf8_decoding'] ) node = TubeNode() @@ -40,14 +49,14 @@ async def __process(req): return node -@pytest.fixture(params=[{'server': False}]) -def dealer_node1(request): - +@pytest.fixture(params=[{'server': False, 'utf8_decoding': True}]) +def dealer_node(request): tube = Tube( name='DEALER1', addr=ADDR, server=request.param['server'], - tube_type=zmq.DEALER + tube_type=zmq.DEALER, + utf8_decoding=request.param['utf8_decoding'] ) node = TubeNode() @@ -55,45 +64,69 @@ def dealer_node1(request): return node -@pytest.mark.asyncio -async def test_router_dealer(router_node, dealer_node1, data): +################################################################################ +# Tests +################################################################################ + +@pytest.mark.asyncio +async def test_router_dealer(router_node, dealer_node, data, result): res = [] async def __process(req): res.append(req.payload) - - dealer_node1.register_handler(f"{TOPIC}/#", __process) - - async with router_node, dealer_node1: - for _ in range(len(data)): - await dealer_node1.send(f"{TOPIC}/A", data[0]) - await asyncio.sleep(.1) - - assert len(res) == 4 - assert len(data) == 0 + dealer_node.register_handler(f"{TOPIC}/#", __process) + result.clear() + async with router_node, dealer_node: + while data: + await dealer_node.send(f"{TOPIC}/A", data.pop()) + assert await wait_for_result( + lambda: len(res) == 4 and len(result) == 4, + timeout=1 + ) @pytest.mark.asyncio -async def test_router_dealer_on_same_node(router_node, data): +async def test_router_dealer_on_same_node(router_node, data, result): res = [] async def __process(req): res.append(req.payload) - tube = Tube( name='DEALER', addr=ADDR, server=False, tube_type=zmq.DEALER ) + result.clear() router_node.register_tube(tube, f"{TOPIC}/#") router_node.register_handler(f"{TOPIC}/#", __process, tube) async with router_node: - for _ in range(len(data)): - await router_node.send(f"{TOPIC}/A", data[0], tube) - await asyncio.sleep(.1) + while data: + await router_node.send(f"{TOPIC}/A", data.pop(), tube) + assert await wait_for_result( + lambda: len(res) == 4 and len(result) == 4, + timeout=1 + ) - assert len(res) == 4 - assert len(data) == 0 + +@pytest.mark.asyncio +@pytest.mark.parametrize("router_node,dealer_node", + [({'server': True, 'utf8_decoding': False}, + {'server': False, 'utf8_decoding': False})], + indirect=["router_node", "dealer_node"]) +async def test_router_dealer_bytes(router_node, dealer_node, result): + res = [] + + async def __process(req): + res.append(req.payload) + dealer_node.register_handler(f"{TOPIC}/#", __process) + result.clear() + async with router_node, dealer_node: + await dealer_node.send(f"{TOPIC}/A", 'XXX') + assert await wait_for_result( + lambda: len(res) == 1 and isinstance(res[0], bytes) and + len(result) == 1 and isinstance(result[0], bytes), + timeout=1 + ) diff --git a/tests/async/test_req_rep.py b/tests/async/test_req_rep.py index a006355..ae6cdd5 100644 --- a/tests/async/test_req_rep.py +++ b/tests/async/test_req_rep.py @@ -4,6 +4,7 @@ import zmq import pytest +from tests.helpers import wait_for_result from zmq_tubes.manager import TubeMessageTimeout from zmq_tubes import Tube, TubeNode @@ -16,22 +17,36 @@ @pytest.fixture def data(): - return ['REQ10', 'REQ11', 'REQ20', 'REQ21'] + return ['REQ10', 'REQ11'].copy() -@pytest.fixture(params=[{'server': True, 'sleep': None}]) -def resp_node(data, request): +@pytest.fixture +def data2(): + return ['REQ20', 'REQ21'].copy() + + +@pytest.fixture +def result(): + return [] + + +@pytest.fixture(params=[{'server': True, 'sleep': None, 'utf8_decoding': True}]) +def resp_node(result, request): async def __process(req): - data.remove(req.payload) + result.append(req.payload) if request.param['sleep']: await asyncio.sleep(request.param['sleep']) - return req.create_response(f'RESP{req.payload[-2:]}') + return req.create_response( + f'RESP{req.payload[-2:]}' if request.param['utf8_decoding'] else + b'RESP' + req.payload[-2:] + ) tube = Tube( name='REP', addr=ADDR, server=request.param['server'], - tube_type=zmq.REP + tube_type=zmq.REP, + utf8_decoding=request.param['utf8_decoding'] ) node = TubeNode() @@ -40,13 +55,14 @@ async def __process(req): return node -@pytest.fixture(params=[{'server': False}]) +@pytest.fixture(params=[{'server': False, 'utf8_decoding': True}]) def req_node1(request): tube = Tube( name='REQ1', addr=ADDR, server=request.param['server'], - tube_type=zmq.REQ + tube_type=zmq.REQ, + utf8_decoding=request.param['utf8_decoding'] ) node = TubeNode() @@ -54,13 +70,14 @@ def req_node1(request): return node -@pytest.fixture(params=[{'server': False}]) +@pytest.fixture(params=[{'server': False, 'utf8_decoding': True}]) def req_node2(request): tube = Tube( name='REQ2', addr=ADDR, server=request.param['server'], - tube_type=zmq.REQ + tube_type=zmq.REQ, + utf8_decoding=request.param['utf8_decoding'] ) node = TubeNode() @@ -68,35 +85,37 @@ def req_node2(request): return node +################################################################################ +# Tests +################################################################################ + @pytest.mark.asyncio -async def test_resp_reqs(resp_node, req_node1, req_node2, data): +async def test_resp_reqs(resp_node, req_node1, req_node2, data, data2, result): res = [] + result.clear() - async def step(node, p, delay=None): + async def step(node, d, p, delay=None): if delay: await asyncio.sleep(delay) # distance between tasks - while data: - resp = await node.request(f"{TOPIC}/{p}", data[0], timeout=1) + while d: + resp = await node.request(f"{TOPIC}/{p}", d.pop(), timeout=1) res.append('RESP' in resp.payload) async with resp_node: await asyncio.gather( - asyncio.create_task(step(req_node1, 'A')), - asyncio.create_task(step(req_node2, 'B', delay=.1)) + asyncio.create_task(step(req_node1, data, 'A')), + asyncio.create_task(step(req_node2, data2, 'B', delay=.05)) + ) + assert await wait_for_result( + lambda: len(res) == 4 and all(res) and len(result) == 4, + timeout=1 ) - assert all(res) - assert len(data) == 0 @pytest.mark.asyncio -async def test_req_resp_on_same_node(resp_node, data): +async def test_req_resp_on_same_node(resp_node, data, result): res = [] - - async def step(node, p, d): - for it in d: - resp = await node.request(f"{TOPIC}/{p}", it, timeout=1) - res.append('RESP' in resp.payload) - + result.clear() tube = Tube( name='REQ', addr=ADDR, @@ -104,27 +123,45 @@ async def step(node, p, d): tube_type=zmq.REQ ) resp_node.register_tube(tube, f"{TOPIC}/#") - async with resp_node: - await step(resp_node, 'A', data.copy()) - assert all(res) - assert len(data) == 0 + while data: + resp = await resp_node.request(f"{TOPIC}/A", data.pop(), timeout=1) + res.append('RESP' in resp.payload) + assert await wait_for_result( + lambda: len(res) == 2 and all(res) and len(result) == 2, + timeout=1 + ) @pytest.mark.asyncio @pytest.mark.parametrize("resp_node", - [({'server': True, 'sleep': 3})], + [({'server': True, 'sleep': 1, + 'utf8_decoding': True})], indirect=["resp_node"]) async def test_req_resp_timeout(resp_node, req_node1, data): - res = [] - - async def step(node, p): + async with resp_node: try: - await node.request(f"{TOPIC}/{p}", data[0], timeout=.5) - res.append(False) + await req_node1.request(f"{TOPIC}/A", data.pop(), timeout=.1) + assert False except TubeMessageTimeout: - res.append(True) + assert True + +@pytest.mark.asyncio +@pytest.mark.parametrize("resp_node,req_node1", + [({'server': True, 'sleep': None, + 'utf8_decoding': False}, + {'server': False, 'utf8_decoding': False})], + indirect=["resp_node", "req_node1"]) +async def test_req_resp_bytes(resp_node, req_node1, data, result): + result.clear() async with resp_node: - await step(req_node1, 'A') - assert all(res) + rr = await req_node1.request(f"{TOPIC}/A", data.pop()) + assert isinstance(rr.payload, bytes) + rr = await req_node1.request(f"{TOPIC}/A", data.pop(), + utf8_decoding=True) + assert not isinstance(rr.payload, bytes) + assert await wait_for_result( + lambda: len(result) == 2 and isinstance(result[0], bytes), + timeout=1 + ) diff --git a/tests/async/test_req_router.py b/tests/async/test_req_router.py index 99b0e11..76699a3 100644 --- a/tests/async/test_req_router.py +++ b/tests/async/test_req_router.py @@ -3,6 +3,7 @@ import sys import pytest +from tests.helpers import wait_for_result from zmq_tubes import Tube, TubeNode pytestmark = pytest.mark.skipif(sys.version_info < (3, 7), @@ -14,22 +15,35 @@ @pytest.fixture def data(): - return ['REQ10', 'REQ11', 'REQ20', 'REQ21'] + return ['REQ10', 'REQ11'].copy() -@pytest.fixture(params=[{'server': True}]) -def router_node(data, request): +@pytest.fixture +def data2(): + return ['REQ20', 'REQ21'].copy() + + +@pytest.fixture +def result(): + return [] + + +@pytest.fixture(params=[{'server': True, 'utf8_decoding': True}]) +def router_node(result, request): async def __process(req): - data.remove(req.payload) - if 'REQ10' in req.payload: - await asyncio.sleep(.3) - return req.create_response(f'RESP{req.payload[-2:]}') + result.append(req.payload) + if isinstance(req.payload, str) and 'REQ10' in req.payload: + await asyncio.sleep(.1) + return req.create_response( + f'RESP1{req.payload[-2:]}' if request.param['utf8_decoding'] + else b'RESP1' + req.payload[-2:]) tube = Tube( name='ROUTER', addr=ADDR, server=request.param['server'], - tube_type=zmq.ROUTER + tube_type=zmq.ROUTER, + utf8_decoding=request.param['utf8_decoding'] ) node = TubeNode() @@ -38,13 +52,14 @@ async def __process(req): return node -@pytest.fixture(params=[{'server': False}]) +@pytest.fixture(params=[{'server': False, 'utf8_decoding': True}]) def req_node1(request): tube = Tube( name='REQ1', addr=ADDR, server=request.param['server'], - tube_type=zmq.REQ + tube_type=zmq.REQ, + utf8_decoding=request.param['utf8_decoding'] ) node = TubeNode() @@ -52,13 +67,14 @@ def req_node1(request): return node -@pytest.fixture(params=[{'server': False}]) +@pytest.fixture(params=[{'server': False, 'utf8_decoding': True}]) def req_node2(request): tube = Tube( name='REQ2', addr=ADDR, server=request.param['server'], - tube_type=zmq.REQ + tube_type=zmq.REQ, + utf8_decoding=request.param['utf8_decoding'] ) node = TubeNode() @@ -66,40 +82,39 @@ def req_node2(request): return node -@pytest.mark.asyncio -async def test_router_reqs(router_node, req_node1, req_node2, data): +################################################################################ +# Tests +################################################################################ +@pytest.mark.asyncio +async def test_router_reqs(router_node, req_node1, req_node2, data, data2, + result): res = [] - async def step(node, p, delay=None): + async def step(node, d, p, delay=None): if delay: await asyncio.sleep(delay) # distance between tasks - while data: - resp = await node.request(f"{TOPIC}/{p}", data[0], timeout=1) + while d: + resp = await node.request(f"{TOPIC}/{p}", d.pop(), timeout=1) res.append('RESP' in resp.payload) - + result.clear() async with router_node: await asyncio.gather( - asyncio.create_task(step(req_node1, 'A')), - asyncio.create_task(step(req_node2, 'B', delay=.1)) + asyncio.create_task(step(req_node1, data, 'A')), + asyncio.create_task(step(req_node2, data2, 'B', delay=.1)) + ) + assert await wait_for_result( + lambda: len(res) == 4 and len(result) == 4, + timeout=1 ) - assert all(res) - assert len(data) == 0 @pytest.mark.asyncio -async def test_req_router_on_same_node(router_node, data): +async def test_req_router_on_same_node(router_node, data, result): """ The REQ/ROUTER and client on the same node. """ - res = [] - - async def step(node, p): - for _ in range(len(data)): - resp = await node.request(f"{TOPIC}/{p}", data[0], timeout=1) - res.append('RESP' in resp.payload) - tube = Tube( name='REQ', addr=ADDR, @@ -107,8 +122,31 @@ async def step(node, p): tube_type=zmq.REQ ) router_node.register_tube(tube, f"{TOPIC}/#") + result.clear() + async with router_node: + while data: + resp = await router_node.request(f"{TOPIC}/A", data.pop(), + timeout=1) + res.append('RESP' in resp.payload) + assert await wait_for_result( + lambda: len(res) == 2 and len(result) == 2, + timeout=1 + ) + +@pytest.mark.asyncio +@pytest.mark.parametrize("router_node,req_node1", + [({'server': True, 'utf8_decoding': False}, + {'server': False, 'utf8_decoding': False})], + indirect=["router_node", "req_node1"]) +async def test_req_router_bytes(router_node, req_node1, result): + result.clear() async with router_node: - await step(router_node, 'A') - assert all(res) - assert len(data) == 0 + res = await req_node1.request(f"{TOPIC}/A", 'XXX1') + assert isinstance(res.payload, bytes) + res = await req_node1.request(f"{TOPIC}/A", 'XXX2', utf8_decoding=True) + assert not isinstance(res.payload, bytes) + assert await wait_for_result( + lambda: len(result) == 2 and isinstance(result[0], bytes), + timeout=1 + ) diff --git a/tests/async/test_sub_pub.py b/tests/async/test_sub_pub.py index 17a5e89..a97ba2e 100644 --- a/tests/async/test_sub_pub.py +++ b/tests/async/test_sub_pub.py @@ -1,10 +1,10 @@ import sys import asyncio - -import zmq import pytest +import zmq +from tests.helpers import wait_for_result from zmq_tubes import Tube, TubeNode pytestmark = pytest.mark.skipif(sys.version_info < (3, 7), @@ -16,24 +16,35 @@ @pytest.fixture def data(): - return ['PUB10', 'PUB11', 'PUB20', 'PUB21'] + return ['PUB10', 'PUB11', 'PUB20', 'PUB21'].copy() @pytest.fixture def data2(): - return ['PUB10', 'PUB11', 'PUB20', 'PUB21'] + return ['PUB10', 'PUB11', 'PUB20', 'PUB21'].copy() + +@pytest.fixture +def result(): + return [] + + +@pytest.fixture +def result2(): + return [] -@pytest.fixture(params=[{'server': True}]) -def sub_node(data, request): + +@pytest.fixture(params=[{'server': True, 'utf8_decoding': True}]) +def sub_node(result, request): async def __process(req): - data.remove(req.payload) + result.append(req.payload) tube = Tube( name='SUB1', addr=ADDR, server=request.param['server'], - tube_type=zmq.SUB + tube_type=zmq.SUB, + utf8_decoding=request.param['utf8_decoding'] ) node = TubeNode() @@ -42,16 +53,17 @@ async def __process(req): return node -@pytest.fixture(params=[{'server': True}]) -def sub_node2(data2, request): +@pytest.fixture(params=[{'server': True, 'utf8_decoding': True}]) +def sub_node2(result2, request): async def __process(req): - data2.remove(req.payload) + result2.append(req.payload) tube = Tube( name='SUB2', addr=ADDR, server=request.param['server'], - tube_type=zmq.SUB + tube_type=zmq.SUB, + utf8_decoding=request.param['utf8_decoding'] ) node = TubeNode() @@ -88,59 +100,55 @@ def pub_node2(request): return node +################################################################################ +# Tests +################################################################################ + + @pytest.mark.asyncio -async def test_sub_pubs(sub_node, pub_node1, pub_node2, data): +async def test_sub_pubs(sub_node, pub_node1, pub_node2, data, data2, + result): """ The subscriber is server and two clients publish messages. """ - - async def step(node1, node2): + result.clear() + async with sub_node, pub_node1, pub_node2: await asyncio.sleep(.2) # we have to wait for server is ready while data: - await node1.publish(f"{TOPIC}/A", data[0]) - await node2.publish(f"{TOPIC}/B", data[1]) - await asyncio.sleep(.1) # we have to give server time for work - - async with sub_node, pub_node1, pub_node2: - await step(pub_node1, pub_node2) - - assert len(data) == 0 + await pub_node1.publish(f"{TOPIC}/A", data.pop()) + await pub_node2.publish(f"{TOPIC}/B", data2.pop()) + assert await wait_for_result(lambda: len(result) == 8, timeout=1) @pytest.mark.asyncio @pytest.mark.parametrize("sub_node,sub_node2,pub_node1", - [({'server': False}, {'server': False}, + [({'server': False, 'utf8_decoding': True}, + {'server': False, 'utf8_decoding': True}, {'server': True})], indirect=["sub_node", "sub_node2", "pub_node1"]) -async def test_pub_subs(sub_node, sub_node2, pub_node1, data, data2): +async def test_pub_subs(sub_node, sub_node2, pub_node1, data, result, result2): """ The publisher is server and two clients subscribe messages. """ - async def step(node): + result.clear() + result2.clear() + async with sub_node, sub_node2, pub_node1: await asyncio.sleep(.2) # we have to wait for server is ready - for _ in range(len(data)): - await node.publish(f"{TOPIC}/A", data[0]) + while data: + await pub_node1.publish(f"{TOPIC}/A", data.pop()) await asyncio.sleep(.1) # we have to give server time for work - - async with sub_node, sub_node2, pub_node1: - await step(pub_node1) - - assert len(data) == 0 - assert len(data2) == 0 + assert await wait_for_result( + lambda: len(result) == 4 and len(result2) == 4, + timeout=1 + ) @pytest.mark.asyncio -async def test_pub_sub_on_same_node(sub_node, data): +async def test_pub_sub_on_same_node(sub_node, data, result): """ The publisher and client on the same node. """ - - async def step(node): - await asyncio.sleep(.2) # we have to wait for server is ready - for _ in range(len(data)): - await node.publish(f"{TOPIC}/A", data[0]) - await asyncio.sleep(.1) # we have to give server time for work - + result.clear() tube = Tube( name='PUB', addr=ADDR, @@ -150,6 +158,22 @@ async def step(node): sub_node.register_tube(tube, f"{TOPIC}/#") async with sub_node: - await step(sub_node) + await asyncio.sleep(.2) # we have to wait for server is ready + while data: + await sub_node.publish(f"{TOPIC}/A", data.pop()) + assert await wait_for_result(lambda: len(result) == 4, timeout=1) - assert len(data) == 0 + +@pytest.mark.asyncio +@pytest.mark.parametrize("sub_node", + [({'server': True, 'utf8_decoding': False})], + indirect=["sub_node"]) +async def test_pub_sub_bytes(sub_node, pub_node1, result): + result.clear() + async with sub_node, pub_node1: + await asyncio.sleep(.2) # we have to wait for server is ready + await pub_node1.publish(f"{TOPIC}/A", 'XXX') + assert await wait_for_result( + lambda: len(result) > 0 and isinstance(result.pop(), bytes), + timeout=1 + ) diff --git a/tests/helpers.py b/tests/helpers.py index feae5a9..f5f115b 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -1,9 +1,43 @@ import asyncio import logging import threading +import time from threading import Thread +async def wait_for_result(condition, timeout, params=None): + if not params: + params = [] + + async def __test(): + while True: + try: + if condition(*params): + return + except Exception: + pass + await asyncio.sleep(0.1) + try: + await asyncio.wait_for(__test(), timeout=timeout) + return True + except asyncio.TimeoutError: + return False + + +def wait_for_result2(condition, timeout, params=None): + if not params: + params = [] + while timeout > 0: + try: + if condition(*params): + return True + except Exception: + pass + time.sleep(0.1) + timeout -= 0.1 + return False + + def _handle_task_result(task: asyncio.Task) -> None: try: task.result() diff --git a/tests/test_load_schema.py b/tests/test_load_schema.py index 5e442b4..b39107b 100644 --- a/tests/test_load_schema.py +++ b/tests/test_load_schema.py @@ -25,6 +25,7 @@ def test_load_schema_simple(): assert node.tubes[0].is_server assert node.tubes[0].tube_type_name == 'REP' assert node.tubes[0].identity == 'XXX' + assert node.tubes[0].utf8_decoding assert node.get_tube_by_topic('foo/aaa') is not None assert node.get_tube_by_topic('xxx/bar') is not None assert node.get_tube_by_topic('xxx/aaa') is None @@ -43,6 +44,7 @@ def test_load_schema_hierarchy(): - name: tube2 addr: ipc:///tmp/test.pipe tube_type: REQ + utf8_decoding: False topics: - foo/test/# - +/bar/test @@ -53,5 +55,6 @@ def test_load_schema_hierarchy(): assert len(node.tubes) == 2 assert node.get_tube_by_topic('foo/aaa').name == 'tube1' assert node.get_tube_by_topic('foo/test/aaa').name == 'tube2' + assert not node.get_tube_by_topic('foo/test/aaa').utf8_decoding assert node.get_tube_by_topic('xxx/bar').name == 'tube1' assert node.get_tube_by_topic('xxx/bar/test').name == 'tube2' diff --git a/tests/threads/test_dealer_dealer.py b/tests/threads/test_dealer_dealer.py index 2520272..6d66241 100644 --- a/tests/threads/test_dealer_dealer.py +++ b/tests/threads/test_dealer_dealer.py @@ -1,8 +1,7 @@ -import pytest -import time - import zmq +import pytest +from tests.helpers import wait_for_result2 as wait_for_result from zmq_tubes.threads import Tube, TubeNode ADDR = 'ipc:///tmp/dealer_dealer.pipe' @@ -11,20 +10,28 @@ @pytest.fixture def data(): - return ['REQ10', 'REQ11', 'REQ20', 'REQ21'] + return ['REQ10', 'REQ11', 'REQ20', 'REQ21'].copy() + + +@pytest.fixture +def result(): + return [] -@pytest.fixture(params=[{'server': True}]) -def dealer_node1(request, data): +@pytest.fixture(params=[{'server': True, 'utf8_decoding': True}]) +def dealer_node1(request, result): def __process(req): - data.remove(req.payload) - req.tube.send(req.create_response(f'DEALER1-{req.payload[-2:]}')) + result.append(req.payload) + req.tube.send(req.create_response( + f'DEALER1{req.payload[-2:]}' if request.param['utf8_decoding'] + else b'DEALER1' + req.payload[-2:])) tube = Tube( name='DEALER1', addr=ADDR, server=request.param['server'], - tube_type=zmq.DEALER + tube_type=zmq.DEALER, + utf8_decoding=request.param['utf8_decoding'] ) node = TubeNode() @@ -33,13 +40,14 @@ def __process(req): return node -@pytest.fixture(params=[{'server': False}]) +@pytest.fixture(params=[{'server': False, 'utf8_decoding': True}]) def dealer_node2(request): tube = Tube( name='DEALER2', addr=ADDR, server=request.param['server'], - tube_type=zmq.DEALER + tube_type=zmq.DEALER, + utf8_decoding=request.param['utf8_decoding'] ) node = TubeNode() @@ -47,31 +55,36 @@ def dealer_node2(request): return node -def test_dealer_dealer(dealer_node1, dealer_node2, data): +################################################################################ +# Tests +################################################################################ + + +@pytest.mark.asyncio +def test_dealer_dealer(dealer_node1, dealer_node2, data, result): res = [] def __process(req): res.append(req.payload) - dealer_node2.register_handler(f"{TOPIC}/#", __process) with dealer_node1, dealer_node2: - for it in data.copy(): - dealer_node2.send(f"{TOPIC}/A", it) - time.sleep(.5) + while data: + dealer_node2.send(f"{TOPIC}/A", data.pop()) + assert wait_for_result( + lambda: len(res) == 4 and len(result) == 4, + timeout=1 + ) - assert len(res) == 4 - assert len(data) == 0 - -def test_dealer_dealer_on_same_node(dealer_node1, data): +def test_dealer_dealer_on_same_node(dealer_node1, data, result): res = [] def __process(req): res.append(req.payload) - + result.clear() tube = Tube( - name='DEALER1', + name='DEALER2', addr=ADDR, server=False, tube_type=zmq.DEALER @@ -81,9 +94,30 @@ def __process(req): dealer_node1.register_handler(f"{TOPIC}/#", __process, tube) with dealer_node1: - for it in data.copy(): - dealer_node1.send(f"{TOPIC}/A", it) - time.sleep(1) + while data: + dealer_node1.send(f"{TOPIC}/A", data.pop()) + assert wait_for_result( + lambda: len(res) == 4 and len(result) == 4, + timeout=1 + ) + + +@pytest.mark.parametrize("dealer_node1,dealer_node2", + [({'server': True, 'utf8_decoding': False}, + {'server': False, 'utf8_decoding': False})], + indirect=["dealer_node1", "dealer_node2"]) +def test_dealer_reps_bytes(dealer_node1, dealer_node2, result): + res = [] - assert len(res) == 4 - assert len(data) == 0 + def __process(req): + res.append(req.payload) + dealer_node2.register_handler(f"{TOPIC}/#", __process) + + result.clear() + with dealer_node1, dealer_node2: + dealer_node2.send(f"{TOPIC}/A", 'XXX') + assert wait_for_result( + lambda: len(res) == 1 and isinstance(res[0], bytes) and + len(result) == 1 and isinstance(result[0], bytes), + timeout=1 + ) diff --git a/tests/threads/test_dealer_rep.py b/tests/threads/test_dealer_rep.py index 14e0f27..5f59e44 100644 --- a/tests/threads/test_dealer_rep.py +++ b/tests/threads/test_dealer_rep.py @@ -1,7 +1,7 @@ import pytest -import time import zmq +from tests.helpers import wait_for_result2 from zmq_tubes.threads import Tube, TubeNode ADDR = 'ipc:///tmp/dealer_rep.pipe' @@ -10,21 +10,32 @@ @pytest.fixture def data(): - return ['REQ10', 'REQ11'] + return ['REQ10', 'REQ11'].copy() @pytest.fixture def data2(): - return ['REQ20', 'REQ21'] + return ['REQ20', 'REQ21'].copy() -@pytest.fixture(params=[{'server': True}]) +@pytest.fixture +def result(): + return [] + + +@pytest.fixture +def result2(): + return [] + + +@pytest.fixture(params=[{'server': True, 'utf8_decoding': True}]) def dealer_node(request): tube = Tube( - name='DEALER', + name='DEALER1', addr=ADDR, server=request.param['server'], - tube_type=zmq.DEALER + tube_type=zmq.DEALER, + utf8_decoding=request.param['utf8_decoding'] ) node = TubeNode() @@ -32,79 +43,89 @@ def dealer_node(request): return node -@pytest.fixture(params=[{'server': False}]) -def resp_node1(data, request): +@pytest.fixture(params=[{'server': False, 'utf8_decoding': True}]) +def resp_node1(result, request): def __process(req): - if req.payload in data: - data.remove(req.payload) - return req.create_response(f'RESP{req.payload[-2:]}') + result.append(req.payload) + return req.create_response( + f'RESP1{req.payload[-2:]}' if request.param['utf8_decoding'] + else b'RESP1' + req.payload[-2:]) tube = Tube( - name='REP1', + name='RESP1', addr=ADDR, server=request.param['server'], - tube_type=zmq.REP + tube_type=zmq.REP, + utf8_decoding=request.param['utf8_decoding'] ) node = TubeNode() - node.register_tube(tube, f"{TOPIC}/#") - node.register_handler(f"{TOPIC}/#", __process) + node.register_tube(tube, f"{TOPIC}/A") + node.register_handler(f"{TOPIC}/A", __process, tube) return node -@pytest.fixture(params=[{'server': False}]) -def resp_node2(data2, request): +@pytest.fixture(params=[{'server': False, 'utf8_decoding': True}]) +def resp_node2(result2, request): def __process(req): - if req.payload in data2: - data2.remove(req.payload) - return req.create_response(f'RESP{req.payload[-2:]}') + result2.append(req.payload) + return req.create_response( + f'RESP2{req.payload[-2:]}' if request.param['utf8_decoding'] + else b'RESP2' + req.payload[-2:] + ) tube = Tube( - name='REP2', + name='RESP2', addr=ADDR, server=request.param['server'], - tube_type=zmq.REP + tube_type=zmq.REP, + utf8_decoding=request.param['utf8_decoding'] ) node = TubeNode() - node.register_tube(tube, f"{TOPIC}/#") - node.register_handler(f"{TOPIC}/#", __process) + node.register_tube(tube, f"{TOPIC}/B") + node.register_handler(f"{TOPIC}/B", __process, tube) return node -def test_dealer_reps(dealer_node, resp_node1, resp_node2, data, data2): +################################################################################ +# Tests +################################################################################ + +def test_dealer_reps(dealer_node, resp_node1, resp_node2, data, data2, + result, result2): + """ + One dealer send request to two resp servers + """ res = [] def __process(req): res.append(req.payload) - dealer_node.register_handler(f"{TOPIC}/#", __process) - + result.clear() + result2.clear() with dealer_node, resp_node1, resp_node2: - d1 = data.copy() - d2 = data2.copy() - while d1 and d2: - dealer_node.send(f"{TOPIC}/A", d1.pop(0)) - dealer_node.send(f"{TOPIC}/A", d2.pop(0)) - time.sleep(1) + while data: + dealer_node.send(f"{TOPIC}/A", data.pop()) + dealer_node.send(f"{TOPIC}/B", data2.pop()) + assert wait_for_result2( + lambda: len(res) == 4 and len(result) == 2 and len(result2) == 2, + timeout=1 + ) - assert len(res) == 4 - assert len(data) == 0 - assert len(data2) == 0 - -def test_dealer_reps_on_same_node(dealer_node, data): +def test_dealer_reps_on_same_node(dealer_node, data, result): res = [] def __process(req): res.append(req.payload) - dealer_node.register_handler(f"{TOPIC}/#", __process, dealer_node.tubes[0]) def __process_resp(req): - data.remove(req.payload) + result.append(req.payload) return req.create_response(f'RESP-{req.payload[-2:]}') + result.clear() tube = Tube( name='RESP', addr=ADDR, @@ -115,9 +136,30 @@ def __process_resp(req): dealer_node.register_handler(f"{TOPIC}/#", __process_resp, tube) with dealer_node: - for it in data.copy(): - dealer_node.send(f"{TOPIC}/A", it) - time.sleep(.1) + while data: + dealer_node.send(f"{TOPIC}/A", data.pop()) + assert wait_for_result2( + lambda: len(res) == 2 and len(result) == 2, + timeout=1 + ) + + +@pytest.mark.parametrize("dealer_node,resp_node1", + [({'server': True, 'utf8_decoding': False}, + {'server': False, 'utf8_decoding': False})], + indirect=["dealer_node", "resp_node1"]) +def test_dealer_reps_bytes(dealer_node, resp_node1, result): + res = [] - assert len(res) == 2 - assert len(data) == 0 + def __process(req): + res.append(req.payload) + + dealer_node.register_handler(f"{TOPIC}/#", __process) + result.clear() + with dealer_node, resp_node1: + dealer_node.send(f"{TOPIC}/A", 'XXX') + assert wait_for_result2( + lambda: len(res) == 1 and isinstance(res[0], bytes) and + len(result) == 1 and isinstance(result[0], bytes), + timeout=1 + ) diff --git a/tests/threads/test_dealer_router.py b/tests/threads/test_dealer_router.py index c7b4830..c767575 100644 --- a/tests/threads/test_dealer_router.py +++ b/tests/threads/test_dealer_router.py @@ -1,8 +1,9 @@ -import pytest import time import zmq +import pytest +from tests.helpers import wait_for_result2 as wait_for_result from zmq_tubes.threads import Tube, TubeNode ADDR = 'ipc:///tmp/dealer_router.pipe' @@ -11,22 +12,30 @@ @pytest.fixture def data(): - return ['REQ10', 'REQ11', 'REQ20', 'REQ21'] + return ['REQ10', 'REQ11', 'REQ20', 'REQ21'].copy() + + +@pytest.fixture +def result(): + return [] -@pytest.fixture(params=[{'server': True}]) -def router_node(data, request): +@pytest.fixture(params=[{'server': True, 'utf8_decoding': True}]) +def router_node(result, request): def __process(req): - data.remove(req.payload) - if 'REQ10' in req.payload: + result.append(req.payload) + if isinstance(req.payload, str) and 'REQ10' in req.payload: time.sleep(.3) - return req.create_response(f'RESP{req.payload[-2:]}') + return req.create_response( + f'RESP1{req.payload[-2:]}' if request.param['utf8_decoding'] + else b'RESP1' + req.payload[-2:]) tube = Tube( name='ROUTER', addr=ADDR, server=request.param['server'], - tube_type=zmq.ROUTER + tube_type=zmq.ROUTER, + utf8_decoding=request.param['utf8_decoding'] ) node = TubeNode() @@ -35,13 +44,14 @@ def __process(req): return node -@pytest.fixture(params=[{'server': False}]) +@pytest.fixture(params=[{'server': False, 'utf8_decoding': True}]) def dealer_node(request): tube = Tube( - name='DEALER', + name='DEALER1', addr=ADDR, server=request.param['server'], - tube_type=zmq.DEALER + tube_type=zmq.DEALER, + utf8_decoding=request.param['utf8_decoding'] ) node = TubeNode() @@ -49,44 +59,66 @@ def dealer_node(request): return node -def test_dealer_router(router_node, dealer_node, data): +################################################################################ +# Tests +################################################################################ + + +def test_router_dealer(router_node, dealer_node, data, result): res = [] def __process(req): res.append(req.payload) - dealer_node.register_handler(f"{TOPIC}/#", __process) - + result.clear() with router_node, dealer_node: - for it in data.copy(): - dealer_node.send(f"{TOPIC}/A", it) - time.sleep(.1) - time.sleep(.1) - - assert len(res) == 4 - assert len(data) == 0 + while data: + dealer_node.send(f"{TOPIC}/A", data.pop()) + assert wait_for_result( + lambda: len(res) == 4 and len(result) == 4, + timeout=1 + ) -def test_dealer_router_on_same_node(router_node, data): +def test_dealer_router_on_same_node(router_node, data, result): res = [] def __process(req): res.append(req.payload) - tube = Tube( name='DEALER', addr=ADDR, server=False, tube_type=zmq.DEALER ) + result.clear() router_node.register_tube(tube, f"{TOPIC}/#") router_node.register_handler(f"{TOPIC}/#", __process, tube) with router_node: - for it in data.copy(): - router_node.send(f"{TOPIC}/A", it) - time.sleep(.1) - time.sleep(.1) + while data: + router_node.send(f"{TOPIC}/A", data.pop()) + assert wait_for_result( + lambda: len(res) == 4 and len(result) == 4, + timeout=1 + ) + + +@pytest.mark.parametrize("router_node,dealer_node", + [({'server': True, 'utf8_decoding': False}, + {'server': False, 'utf8_decoding': False})], + indirect=["router_node", "dealer_node"]) +def test_router_dealer_bytes(router_node, dealer_node, result): + res = [] - assert len(res) == 4 - assert len(data) == 0 + def __process(req): + res.append(req.payload) + dealer_node.register_handler(f"{TOPIC}/#", __process) + result.clear() + with router_node, dealer_node: + dealer_node.send(f"{TOPIC}/A", 'XXX') + assert wait_for_result( + lambda: len(res) == 1 and isinstance(res[0], bytes) and + len(result) == 1 and isinstance(result[0], bytes), + timeout=1 + ) diff --git a/tests/threads/test_monitor.py b/tests/threads/test_monitor.py index 2223d43..1109457 100644 --- a/tests/threads/test_monitor.py +++ b/tests/threads/test_monitor.py @@ -5,6 +5,7 @@ import pytest import zmq +from tests.helpers import wait_for_result2 as wait_for_result from zmq_tubes.threads import Tube, TubeNode, TubeMonitor from zmq_tubes.monitoring import get_schema, logs, simulate @@ -15,13 +16,18 @@ @pytest.fixture def data(): - return ['REQ10', 'REQ11'] + return ['REQ10', 'REQ11'].copy() + + +@pytest.fixture +def result(): + return [] @pytest.fixture(params=[{'server': True}]) -def resp_node(data, request): +def resp_node(result, request): def __process(req): - data.remove(req.payload) + result.append(req.payload) return req.create_response(f'RESP{req.payload[-2:]}') tube = Tube( @@ -54,6 +60,11 @@ def req_node1(request): return node +################################################################################ +# Tests +################################################################################ + + def test_schema(resp_node): with resp_node: schema = get_schema(MONITOR) @@ -67,27 +78,31 @@ def test_schema(resp_node): assert tube.get('monitor') == MONITOR -def test_logging(resp_node, req_node1, data): - +def test_logging(resp_node, req_node1, data, result): + result.clear() buffer = io.BytesIO() th = Thread(target=lambda: logs(MONITOR, buffer, True, False), daemon=True) th.start() with resp_node: - for _ in range(len(data)): - res = req_node1.request(f"{TOPIC}/A", data[0]) + while data: + res = req_node1.request(f"{TOPIC}/A", data.pop(0)) assert 'RESP' in res.payload + assert wait_for_result( + lambda: len(result) == 2, + timeout=1 + ) th.join(timeout=2) lines = buffer.getvalue().decode().split('\n') + print(lines) assert len(lines) >= 4 assert lines[0].endswith('REP < req/A REQ10') assert lines[1].endswith('REP > req/A RESP10') assert lines[2].endswith('REP < req/A REQ11') assert lines[3].endswith('REP > req/A RESP11') - assert len(data) == 0 -def test_simulation(resp_node, data): +def test_simulation(resp_node, result): dump = io.BytesIO( b'0.017551183700561523 REP < req/A REQ10\n' b'0.00019621849060058594 REP > req/A RESP10\n' @@ -101,8 +116,10 @@ def test_simulation(resp_node, data): addr: {ADDR} tube_type: REQ """) - + result.clear() with resp_node: simulate(schema, dump, 1) - - assert len(data) == 0 + assert wait_for_result( + lambda: len(result) == 2, + timeout=1 + ) diff --git a/tests/threads/test_req_rep.py b/tests/threads/test_req_rep.py index 2519a91..80bbe43 100644 --- a/tests/threads/test_req_rep.py +++ b/tests/threads/test_req_rep.py @@ -1,7 +1,8 @@ -import pytest import time import zmq +import pytest +from tests.helpers import wait_for_result2 as wait_for_result from zmq_tubes.manager import TubeMessageTimeout from zmq_tubes.threads import Tube, TubeNode @@ -11,22 +12,31 @@ @pytest.fixture def data(): - return ['REQ10', 'REQ11', 'REQ20', 'REQ21'] + return ['REQ10', 'REQ11', 'REQ20', 'REQ21'].copy() + + +@pytest.fixture +def result(): + return [] -@pytest.fixture(params=[{'server': True, 'sleep': None}]) -def resp_node(data, request): +@pytest.fixture(params=[{'server': True, 'sleep': None, 'utf8_decoding': True}]) +def resp_node(result, request): def __process(req): - data.remove(req.payload) + result.append(req.payload) if request.param['sleep']: time.sleep(request.param['sleep']) - return req.create_response(f'RESP{req.payload[-2:]}') + return req.create_response( + f'RESP{req.payload[-2:]}' if request.param['utf8_decoding'] else + b'RESP' + req.payload[-2:] + ) tube = Tube( name='REP', addr=ADDR, server=request.param['server'], - tube_type=zmq.REP + tube_type=zmq.REP, + utf8_decoding=request.param['utf8_decoding'] ) node = TubeNode() @@ -35,13 +45,14 @@ def __process(req): return node -@pytest.fixture(params=[{'server': False}]) +@pytest.fixture(params=[{'server': False, 'utf8_decoding': True}]) def req_node1(request): tube = Tube( name='REQ1', addr=ADDR, server=request.param['server'], - tube_type=zmq.REQ + tube_type=zmq.REQ, + utf8_decoding=request.param['utf8_decoding'] ) node = TubeNode() @@ -49,39 +60,85 @@ def req_node1(request): return node -def test_resp_reqs(resp_node, req_node1, data): +@pytest.fixture(params=[{'server': False, 'utf8_decoding': True}]) +def req_node2(request): + tube = Tube( + name='REQ2', + addr=ADDR, + server=request.param['server'], + tube_type=zmq.REQ, + utf8_decoding=request.param['utf8_decoding'] + ) - with resp_node: - for _ in range(len(data)): - res = req_node1.request(f"{TOPIC}/A", data[0], timeout=1) - assert 'RESP' in res.payload + node = TubeNode() + node.register_tube(tube, f"{TOPIC}/#") + return node - assert len(data) == 0 +################################################################################ +# Tests +################################################################################ -def test_resp_reqs_on_same_node(resp_node, data): +def test_resp_reqs(resp_node, req_node1, data, result): + res = [] + result.clear() + with resp_node: + while data: + resp = req_node1.request(f"{TOPIC}/A", data.pop(), timeout=1) + res.append('RESP' in resp.payload) + assert wait_for_result( + lambda: len(res) == 4 and all(res) and len(result) == 4, + timeout=1 + ) + + +def test_resp_reqs_on_same_node(resp_node, data, result): + res = [] + result.clear() tube = Tube( - name='REQ1', + name='REQ', addr=ADDR, server=False, tube_type=zmq.REQ ) resp_node.register_tube(tube, f"{TOPIC}/#") with resp_node: - for _ in range(len(data)): - res = resp_node.request(f"{TOPIC}/A", data[0], timeout=1) - assert 'RESP' in res.payload - - assert len(data) == 0 + while data: + resp = resp_node.request(f"{TOPIC}/A", data.pop(), timeout=1) + res.append('RESP' in resp.payload) + assert wait_for_result( + lambda: len(res) == 4 and all(res) and len(result) == 4, + timeout=1 + ) @pytest.mark.parametrize("resp_node", - [({'server': True, 'sleep': 1})], + [({'server': True, 'sleep': 1, + 'utf8_decoding': True})], indirect=["resp_node"]) def test_req_resp_timeout(resp_node, req_node1, data): with resp_node: try: - req_node1.request(f"{TOPIC}/A", data[0], timeout=.6) + req_node1.request(f"{TOPIC}/A", data.pop(), timeout=.1) assert False except TubeMessageTimeout: assert True + + +@pytest.mark.parametrize("resp_node,req_node1", + [({'server': True, 'sleep': None, + 'utf8_decoding': False}, + {'server': False, 'utf8_decoding': False})], + indirect=["resp_node", "req_node1"]) +def test_req_resp_bytes(resp_node, req_node1, data, result): + result.clear() + with resp_node: + rr = req_node1.request(f"{TOPIC}/A", data.pop()) + assert isinstance(rr.payload, bytes) + rr = req_node1.request(f"{TOPIC}/A", data.pop(), + utf8_decoding=True) + assert not isinstance(rr.payload, bytes) + assert wait_for_result( + lambda: len(result) == 2 and isinstance(result[0], bytes), + timeout=1 + ) diff --git a/tests/threads/test_req_router.py b/tests/threads/test_req_router.py index d6c5f8e..185998b 100644 --- a/tests/threads/test_req_router.py +++ b/tests/threads/test_req_router.py @@ -3,7 +3,8 @@ import zmq -from ..helpers import run_test_threads, wrapp +from tests.helpers import wait_for_result2 as wait_for_result, \ + run_test_threads, wrapp from zmq_tubes.threads import Tube, TubeNode ADDR = 'ipc:///tmp/req_router.pipe' @@ -12,22 +13,35 @@ @pytest.fixture def data(): - return ['REQ10', 'REQ11', 'REQ20', 'REQ21'] + return ['REQ10', 'REQ11'].copy() -@pytest.fixture(params=[{'server': True, 'sleep': None}]) -def router_node(data, request): +@pytest.fixture +def data2(): + return ['REQ20', 'REQ21'].copy() + + +@pytest.fixture +def result(): + return [] + + +@pytest.fixture(params=[{'server': True, 'utf8_decoding': True}]) +def router_node(result, request): def __process(req): - data.remove(req.payload) - if 'REQ10' in req.payload: - time.sleep(.3) - return req.create_response(f'RESP{req.payload[-2:]}') + result.append(req.payload) + if isinstance(req.payload, str) and 'REQ10' in req.payload: + time.sleep(.1) + return req.create_response( + f'RESP1{req.payload[-2:]}' if request.param['utf8_decoding'] + else b'RESP1' + req.payload[-2:]) tube = Tube( name='ROUTER', addr=ADDR, server=request.param['server'], - tube_type=zmq.ROUTER + tube_type=zmq.ROUTER, + utf8_decoding=request.param['utf8_decoding'] ) node = TubeNode() @@ -36,13 +50,14 @@ def __process(req): return node -@pytest.fixture(params=[{'server': False}]) +@pytest.fixture(params=[{'server': False, 'utf8_decoding': True}]) def req_node1(request): tube = Tube( name='REQ1', addr=ADDR, server=request.param['server'], - tube_type=zmq.REQ + tube_type=zmq.REQ, + utf8_decoding=request.param['utf8_decoding'] ) node = TubeNode() @@ -50,13 +65,14 @@ def req_node1(request): return node -@pytest.fixture(params=[{'server': False}]) +@pytest.fixture(params=[{'server': False, 'utf8_decoding': True}]) def req_node2(request): tube = Tube( name='REQ2', addr=ADDR, server=request.param['server'], - tube_type=zmq.REQ + tube_type=zmq.REQ, + utf8_decoding=request.param['utf8_decoding'] ) node = TubeNode() @@ -64,38 +80,67 @@ def req_node2(request): return node -def test_resp_router(router_node, req_node1, req_node2, data): +################################################################################ +# Tests +################################################################################ + +def test_resp_router(router_node, req_node1, req_node2, data, data2, + result): res = [] @wrapp - def __process(node, p, d): - for it in d.copy(): - r = node.request(f"{TOPIC}/{p}", it, timeout=1) - res.append(r.payload) - + def __process(node, d, p): + while d: + resp = node.request(f"{TOPIC}/{p}", d.pop(), timeout=1) + res.append('RESP' in resp.payload) + result.clear() with router_node: run_test_threads( - __process(req_node1, 'A', data[0:2]), - __process(req_node2, 'B', data[2:]), + __process(req_node1, data, 'A'), + __process(req_node2, data2, 'B'), + ) + assert wait_for_result( + lambda: len(res) == 4 and len(result) == 4, + timeout=1 ) - - assert len(res) == 4 - assert len(data) == 0 -def test_resp_router_on_same_node(router_node, data): +def test_resp_router_on_same_node(router_node, data, result): + """ + The REQ/ROUTER and client on the same node. + """ + res = [] tube = Tube( - name='REQ1', + name='REQ', addr=ADDR, server=False, tube_type=zmq.REQ ) router_node.register_tube(tube, f"{TOPIC}/#") - + result.clear() with router_node: - for _ in range(len(data)): - res = router_node.request(f"{TOPIC}/A", data[0], timeout=1) - assert 'RESP' in res.payload + while data: + resp = router_node.request(f"{TOPIC}/A", data.pop(), timeout=1) + res.append('RESP' in resp.payload) + assert wait_for_result( + lambda: len(res) == 2 and len(result) == 2, + timeout=1 + ) + - assert len(data) == 0 +@pytest.mark.parametrize("router_node,req_node1", + [({'server': True, 'utf8_decoding': False}, + {'server': False, 'utf8_decoding': False})], + indirect=["router_node", "req_node1"]) +def test_req_router_bytes(router_node, req_node1, result): + result.clear() + with router_node: + res = req_node1.request(f"{TOPIC}/A", 'XXX1') + assert isinstance(res.payload, bytes) + res = req_node1.request(f"{TOPIC}/A", 'XXX2', utf8_decoding=True) + assert not isinstance(res.payload, bytes) + assert wait_for_result( + lambda: len(result) == 2 and isinstance(result[0], bytes), + timeout=1 + ) diff --git a/tests/threads/test_sub_pub.py b/tests/threads/test_sub_pub.py index eae24cf..3380456 100644 --- a/tests/threads/test_sub_pub.py +++ b/tests/threads/test_sub_pub.py @@ -4,7 +4,7 @@ import zmq -from ..helpers import run_test_threads, wrapp +from ..helpers import run_test_threads, wrapp, wait_for_result2 from zmq_tubes.threads import Tube, TubeNode ADDR = 'ipc:///tmp/sub_pub.pipe' @@ -13,24 +13,35 @@ @pytest.fixture def data(): - return ['PUB10', 'PUB11', 'PUB20', 'PUB21'] + return ['PUB10', 'PUB11', 'PUB20', 'PUB21'].copy() @pytest.fixture def data2(): - return ['PUB10', 'PUB11', 'PUB20', 'PUB21'] + return ['PUB10', 'PUB11', 'PUB20', 'PUB21'].copy() -@pytest.fixture(params=[{'server': True}]) -def sub_node(data, request): +@pytest.fixture +def result(): + return [] + + +@pytest.fixture +def result2(): + return [] + + +@pytest.fixture(params=[{'server': True, 'utf8_decoding': True}]) +def sub_node(result, request): def __process(req): - data.remove(req.payload) + result.append(req.payload) tube = Tube( name='SUB1', addr=ADDR, server=request.param['server'], - tube_type=zmq.SUB + tube_type=zmq.SUB, + utf8_decoding=request.param['utf8_decoding'] ) node = TubeNode() @@ -39,16 +50,17 @@ def __process(req): return node -@pytest.fixture(params=[{'server': True}]) -def sub_node2(data2, request): +@pytest.fixture(params=[{'server': True, 'utf8_decoding': True}]) +def sub_node2(result2, request): def __process(req): - data2.remove(req.payload) + result2.append(req.payload) tube = Tube( name='SUB2', addr=ADDR, server=request.param['server'], - tube_type=zmq.SUB + tube_type=zmq.SUB, + utf8_decoding=request.param['utf8_decoding'] ) node = TubeNode() @@ -85,59 +97,65 @@ def pub_node2(request): return node -def test_sub_pubs(sub_node, pub_node1, pub_node2, data): +################################################################################ +# Tests +################################################################################ + + +def test_sub_pubs(sub_node, pub_node1, pub_node2, data, data2, result): """ The subscriber is server and two clients publish messages. """ - @wrapp def __process(node, p, d): - for it in d.copy(): - node.publish(f"{TOPIC}/{p}", it) + while d: + node.publish(f"{TOPIC}/{p}", d.pop()) + result.clear() with sub_node, pub_node1, pub_node2: run_test_threads( - __process(pub_node1, 'A', data[0:2]), - __process(pub_node2, 'B', data[2:]), + __process(pub_node1, 'A', data), + __process(pub_node2, 'B', data2), ) - time.sleep(1) - - assert len(data) == 0 + assert wait_for_result2(lambda: len(result) == 8, timeout=1) @pytest.mark.parametrize("sub_node,sub_node2,pub_node1", - [({'server': False}, {'server': False}, + [({'server': False, 'utf8_decoding': True}, + {'server': False, 'utf8_decoding': True}, {'server': True})], indirect=["sub_node", "sub_node2", "pub_node1"]) -def test_pub_subs(sub_node, sub_node2, pub_node1, data, data2): +def test_pub_subs(sub_node, sub_node2, pub_node1, data, data2, result, result2): """ The subscriber is server and two clients publish messages. """ - @wrapp def __process(node, p, d): - for it in d.copy(): - node.publish(f"{TOPIC}/{p}", it) + while d: + node.publish(f"{TOPIC}/{p}", d.pop()) + result.clear() + result2.clear() with pub_node1, sub_node, sub_node2: run_test_threads( __process(pub_node1, 'A', data), ) - time.sleep(1) - - assert len(data) == 0 - assert len(data2) == 0 + assert wait_for_result2( + lambda: len(result) == 4 and len(result2) == 4, + timeout=1 + ) -def test_pub_sub_on_same_node(sub_node, data): +def test_pub_sub_on_same_node(sub_node, data, result): """ The publisher and client on the same node. """ def __process(node, p, d): - for it in d.copy(): - node.publish(f"{TOPIC}/{p}", it) + while d: + node.publish(f"{TOPIC}/{p}", d.pop()) + result.clear() tube = Tube( name='PUB', addr=ADDR, @@ -151,6 +169,18 @@ def __process(node, p, d): run_test_threads( __process(sub_node, 'A', data), ) - time.sleep(1) + assert wait_for_result2(lambda: len(result) == 4, timeout=1) + - assert len(data) == 0 +@pytest.mark.parametrize("sub_node", + [({'server': True, 'utf8_decoding': False})], + indirect=["sub_node"]) +def test_pub_sub_bytes(sub_node, pub_node1, result): + result.clear() + with sub_node, pub_node1: + time.sleep(.1) # we have to wait for server is ready + pub_node1.publish(f"{TOPIC}/A", 'XXX') + assert wait_for_result2( + lambda: len(result) > 0 and isinstance(result.pop(), bytes), + timeout=1 + ) diff --git a/zmq_tubes/manager.py b/zmq_tubes/manager.py index 4075139..0b0f3f6 100644 --- a/zmq_tubes/manager.py +++ b/zmq_tubes/manager.py @@ -98,7 +98,7 @@ def create_response(self, payload=None) -> 'TubeMessage': payload=payload ) - def parse(self, data): + def parse(self, data, utf8_decoding=True): if self.tube.tube_type == zmq.ROUTER: if len(data) != 4: raise TubeMessageError( @@ -113,12 +113,14 @@ def parse(self, data): f"is in unknown format. '{data}'") data.pop(0) - data = [it.decode('utf-8') for it in data] if len(data) != 2: raise TubeMessageError( f"The received message (tube '{self.tube.name}') " - f"is in unknown format. '{data}'") + f"is in unknown format. {data}") self.topic, self.payload = data + self.topic = self.topic.decode('utf-8') + if utf8_decoding: + self.payload = self.payload.decode('utf-8') def format_message(self): response = [] @@ -154,6 +156,7 @@ def __init__(self, **kwargs): self.tube_type = kwargs.get('tube_type') self.identity = kwargs.get('identity') self.monitor = kwargs.get('monitor') + self.utf8_decoding = kwargs.get('utf8_decoding', True) @staticmethod def get_tube_type_name(tube_type): @@ -379,6 +382,7 @@ async def request(self, *args, post_send_callback=None, :param payload: Optional[dict] :param timeout: int :param post_send_callback: Optional[Callable] + :param utf8_decoding: bool (default True) :return: """ if args: @@ -404,7 +408,7 @@ async def request(self, *args, post_send_callback=None, raise NotImplementedError("Unknown type of topic") async def __request_payload(self, topic: str, payload=None, timeout=None, - post_send_callback=None): + post_send_callback=None, utf8_decoding=None): request = TubeMessage( self, payload=payload, @@ -412,11 +416,12 @@ async def __request_payload(self, topic: str, payload=None, timeout=None, raw_socket=self.raw_socket ) return await self.__request_message( - request, timeout=timeout, post_send_callback=post_send_callback + request, timeout=timeout, post_send_callback=post_send_callback, + utf8_decoding=utf8_decoding ) async def __request_message(self, request: TubeMessage, timeout: int = 30, - post_send_callback=None): + post_send_callback=None, utf8_decoding=None): if self.tube_type != zmq.REQ: raise TubeMethodNotSupported( f"The tube '{self.name}' (type: '{self.tube_type_name}') " @@ -431,7 +436,9 @@ async def __request_message(self, request: TubeMessage, timeout: int = 30, post_send_callback(request) if await request.raw_socket.poll(timeout * 1000) != 0: response = await self.receive_data( - raw_socket=request.raw_socket) + raw_socket=request.raw_socket, + utf8_decoding=utf8_decoding + ) if response.topic != request.topic: raise TubeMessageError( f"The response comes to different topic " @@ -447,14 +454,17 @@ async def __request_message(self, request: TubeMessage, timeout: int = 30, raise TubeMessageTimeout( f"No answer for the request in {timeout}s. Topic: {request.topic}") - async def receive_data(self, raw_socket=None): + async def receive_data(self, raw_socket=None, utf8_decoding=None): if not raw_socket: raw_socket = self.raw_socket raw_data = await raw_socket.recv_multipart() self.logger.debug( f"Received (tube {self.name}): {raw_data}") message = TubeMessage(tube=self, raw_socket=raw_socket) - message.parse(raw_data) + message.parse( + raw_data, + self.utf8_decoding if utf8_decoding is None else utf8_decoding + ) try: if self.monitor: await self.monitor.receive_message(message) @@ -653,7 +663,7 @@ def get_tube_by_topic(self, topic: str, types=None) -> Tube: if not res: return None if isinstance(res, list): - res = res.pop() + res = res[-1] return res def filter_tube_by_topic(self, topic: str, types=None) -> [(str, Tube)]: @@ -713,13 +723,15 @@ async def send(self, topic: str, payload=None, tube=None): await tube.send(topic, payload) async def request(self, topic: str, payload=None, timeout=30, - post_send_callback=None) -> TubeMessage: + post_send_callback=None, utf8_decoding=None + ) -> TubeMessage: tube = self.get_tube_by_topic(topic, [zmq.REQ]) if not tube: raise TubeTopicNotConfigured(f'The topic "{topic}" is not assigned ' f'to any Tube for request.') res = await tube.request(topic, payload, timeout=timeout, - post_send_callback=post_send_callback) + post_send_callback=post_send_callback, + utf8_decoding=utf8_decoding) return res async def publish(self, topic: str, payload=None): diff --git a/zmq_tubes/threads.py b/zmq_tubes/threads.py index 6a0d596..1467bc6 100644 --- a/zmq_tubes/threads.py +++ b/zmq_tubes/threads.py @@ -149,6 +149,7 @@ def request(self, *args, post_send_callback=None, **kwargs) -> TubeMessage: :param payload: Optional[dict] :param timeout: int :param post_send_callback: Optional[Callable] + :param utf8_decoding: bool (default True) :return: TubeMessage """ if args: @@ -172,18 +173,19 @@ def request(self, *args, post_send_callback=None, **kwargs) -> TubeMessage: raise NotImplementedError("Unknown type of topic") def __request_payload(self, topic: str, payload=None, timeout=None, - post_send_callback=None): + post_send_callback=None, utf8_decoding=None): request = TubeMessage( self, payload=payload, topic=topic, raw_socket=self.raw_socket, ) - return self.request(request, timeout=timeout, - post_send_callback=post_send_callback) + return self.__request_message(request, timeout=timeout, + post_send_callback=post_send_callback, + utf8_decoding=utf8_decoding) def __request_message(self, request: TubeMessage, timeout: int = 30, - post_send_callback=None): + post_send_callback=None, utf8_decoding=None): if self.tube_type != zmq.REQ: raise TubeMethodNotSupported( f"The tube '{self.name}' (type: '{self.tube_type_name}') " @@ -194,7 +196,10 @@ def __request_message(self, request: TubeMessage, timeout: int = 30, if post_send_callback: post_send_callback(request) if request.raw_socket.poll(timeout * 1000) != 0: - response = self.receive_data(raw_socket=request.raw_socket) + response = self.receive_data( + raw_socket=request.raw_socket, + utf8_decoding=utf8_decoding + ) if response.topic != request.topic: raise TubeMessageError( f"The response comes to different topic " @@ -210,7 +215,7 @@ def __request_message(self, request: TubeMessage, timeout: int = 30, raise TubeMessageTimeout( f"No answer for the request in {timeout}s. Topic: {request.topic}") - def receive_data(self, raw_socket=None, timeout=3): + def receive_data(self, raw_socket=None, timeout=3, utf8_decoding=None): if not raw_socket: raw_socket = self.raw_socket if not self.lock.acquire(timeout=timeout): @@ -223,7 +228,10 @@ def receive_data(self, raw_socket=None, timeout=3): self.logger.debug( f"Received (tube {self.name}): {raw_data}") message = TubeMessage(tube=self, raw_socket=raw_socket) - message.parse(raw_data) + message.parse( + raw_data, + self.utf8_decoding if utf8_decoding is None else utf8_decoding + ) try: if self.monitor: self.monitor.receive_message(message) @@ -279,13 +287,14 @@ def send(self, topic: str, payload=None, tube=None): tube.send(topic, payload) def request(self, topic: str, payload=None, timeout=30, - post_send_callback=None) -> TubeMessage: + post_send_callback=None, utf8_decoding=None) -> TubeMessage: tube = self.get_tube_by_topic(topic, [zmq.REQ]) if not tube: raise TubeTopicNotConfigured(f'The topic "{topic}" is not assigned ' f'to any Tube for request.') res = tube.request(topic, payload, timeout=timeout, - post_send_callback=post_send_callback) + post_send_callback=post_send_callback, + utf8_decoding=utf8_decoding) return res def publish(self, topic: str, payload=None):