Skip to content

Commit

Permalink
utf8_decoding & tests (#15)
Browse files Browse the repository at this point in the history
* Fix #14

* Fix PEP8

* Fix debug in tests
---------

Co-authored-by: Martin Korbel <[email protected]>
  • Loading branch information
BlackSmith and Martin Korbel authored Jul 25, 2023
1 parent 7c20fbc commit 1a056a9
Show file tree
Hide file tree
Showing 20 changed files with 1,042 additions and 481 deletions.
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[flake8]
max-line-length = 80
ignore =
E701
E701, E131, W504
exclude =
.git,
.venv,
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,4 @@ jobs:
- name: Run pytest
shell: bash
run: python -m pytest -p no:sugar tests/
run: python -m pytest -p no:sugar --log-cli-level=DEBUG --capture=tee-sys tests
104 changes: 77 additions & 27 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,58 +29,109 @@ from zmq_tubes.threads import TubeNode, Tube # Threads classes
## Usage:

### Node definitions in yml file
We can define all tubes for one TubeNode by yml file.

We can define all tubes for one TubeNode by yml file.
Next examples require install these packages `PyYAML`, `pyzmq` and `zmq_tubes`.
#### Client service (asyncio example)
```yaml
# test.yml
# client.yml
tubes:
- name: Client REQ
addr: ipc:///tmp/req.pipe
addr: ipc:///tmp/req.pipe
tube_type: REQ
topics:
- foo/#
- +/bar
- foo/bar

- name: Client PUB
addr: ipc:///tmp/pub.pipe
addr: ipc:///tmp/pub.pipe
tube_type: PUB
topics:
- foo/pub/#
```
```python
# client.py
import asyncio
import yaml
from zmq_tubes import TubeNode, TubeMessage


async def run():
with open('client.yml', 'r+') as fd:
schema = yaml.safe_load(fd)
node = TubeNode(schema=schema)
async with node:
print(await node.request('foo/bar', 'message 1'))
await node.publish('foo/pub/test', 'message 2')

- name: Server ROUTER
addr: ipc:///tmp/router.pipe
if __name__ == "__main__":
asyncio.run(run())
```
```shell
> python client.py
topic: foo/bar, payload: response
```
#### Server service (threads example)
```yaml
# server.yml
tubes:
- name: server ROUTER
addr: ipc:///tmp/req.pipe
tube_type: ROUTER
server: yes
sockopts:
LINGER: 0
server: True
topics:
- foo/bar

- name: server SUB
addr: ipc:///tmp/pub.pipe
tube_type: SUB
server: True
topics:
- server/#
- foo/pub/#
```
```python
import asyncio
# server.py
import yaml
from zmq_tubes import TubeNode, TubeMessage
from zmq_tubes.threads import TubeNode, TubeMessage


async def handler(request: TubeMessage):
def handler(request: TubeMessage):
print(request.payload)
return request.create_response('response')
if request.tube.tube_type_name == 'ROUTER':
return request.create_response('response')


async def run():
with open('test.yml', 'r+') as fd:
def run():
with open('server.yml', 'r+') as fd:
schema = yaml.safe_load(fd)
node = TubeNode(schema=schema)
node.register_handler('server/#', handler)
async with node:
await node.publish('foo/pub/test', 'message 1')
print(await node.request('foo/xxx', 'message 2'))
node.register_handler('foo/#', handler)
with node:
node.start().join()

if __name__ == "__main__":
run()
```
asyncio.run(run())
```shell
> python server.py
message 1
message 2
```

### YAML definition

The yaml file starts with a root element `tubes`, which contains list of all our tube definitions.
- `name` - string - name of the tube.
- `addr` - string - connection or bind address in format `transport://address` (see more http://api.zeromq.org/2-1:zmq-connect)
- `server` - bool - is this tube server side (bind to `addr`) or client side (connect to `addr`)
- `tube_type` - string - type of this tube (see more https://zguide.zeromq.org/docs/chapter2/#Messaging-Patterns)
- `identity` - string - (optional) we can setup custom tube identity
- `utf8_decoding` - bool - (default = True), if this is True, the payload is automatically UTF8 decode.
- `sockopts` - dict - (optional) we can setup sockopts for this tube (see more http://api.zeromq.org/4-2:zmq-setsockopt)
- `monitor` - string - (optional) bind address of tube monitor (see more [Debugging / Monitoring](#debugging-/-monitoring))


### Request / Response
Expand Down Expand Up @@ -113,7 +164,6 @@ await node.start()
```

#### Client:

```python
from zmq_tubes import Tube, TubeNode

Expand All @@ -129,8 +179,8 @@ response = await node.request('test/xxx', 'question')
print(response.payload)
# output: 'answer'
```


The method `request` accepts the optional parameter `utf8_decoding`. When we set this parameter to `False` in previous
example, the returned payload is not automatically decoded, we get bytes.


### Subscribe / Publisher
Expand Down
90 changes: 61 additions & 29 deletions tests/async/test_dealer_dealer.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import sys

import asyncio

import zmq
import pytest

from tests.helpers import wait_for_result
from zmq_tubes import Tube, TubeNode

pytestmark = pytest.mark.skipif(sys.version_info < (3, 7),
Expand All @@ -16,20 +15,28 @@

@pytest.fixture
def data():
return ['REQ10', 'REQ11', 'REQ20', 'REQ21']
return ['REQ10', 'REQ11', 'REQ20', 'REQ21'].copy()


@pytest.fixture
def result():
return []


@pytest.fixture(params=[{'server': True}])
def dealer_node1(request, data):
@pytest.fixture(params=[{'server': True, 'utf8_decoding': True}])
def dealer_node1(request, result):
async def __process(req):
data.remove(req.payload)
await req.tube.send(req.create_response(f'DEALER1-{req.payload[-2:]}'))
result.append(req.payload)
await req.tube.send(req.create_response(
f'DEALER1{req.payload[-2:]}' if request.param['utf8_decoding']
else b'DEALER1' + req.payload[-2:]))

tube = Tube(
name='DEALER1',
addr=ADDR,
server=request.param['server'],
tube_type=zmq.DEALER
tube_type=zmq.DEALER,
utf8_decoding=request.param['utf8_decoding']
)

node = TubeNode()
Expand All @@ -38,48 +45,52 @@ async def __process(req):
return node


@pytest.fixture(params=[{'server': False}])
@pytest.fixture(params=[{'server': False, 'utf8_decoding': True}])
def dealer_node2(request):
tube = Tube(
name='DEALER2',
addr=ADDR,
server=request.param['server'],
tube_type=zmq.DEALER
tube_type=zmq.DEALER,
utf8_decoding=request.param['utf8_decoding']
)

node = TubeNode()
node.register_tube(tube, f"{TOPIC}/#")
return node


################################################################################
# Tests
################################################################################


@pytest.mark.asyncio
async def test_dealer_dealer(dealer_node1, dealer_node2, data):
async def test_dealer_dealer(dealer_node1, dealer_node2, data, result):
res = []

async def __process(req):
res.append(req.payload)

dealer_node2.register_handler(f"{TOPIC}/#", __process)

result.clear()
async with dealer_node1, dealer_node2:
for _ in range(len(data)):
await dealer_node2.send(f"{TOPIC}/A", data[0])
await asyncio.sleep(.2)
await asyncio.sleep(1)

assert len(res) == 4
assert len(data) == 0
while data:
await dealer_node2.send(f"{TOPIC}/A", data.pop())
assert await wait_for_result(
lambda: len(res) == 4 and len(result) == 4,
timeout=1
)


@pytest.mark.asyncio
async def test_dealer_dealer_on_same_node(dealer_node1, data):
async def test_dealer_dealer_on_same_node(dealer_node1, data, result):
res = []

async def __process(req):
res.append(req.payload)

result.clear()
tube = Tube(
name='DEALER1',
name='DEALER2',
addr=ADDR,
server=False,
tube_type=zmq.DEALER
Expand All @@ -89,10 +100,31 @@ async def __process(req):
dealer_node1.register_handler(f"{TOPIC}/#", __process, tube)

async with dealer_node1:
for _ in range(len(data)):
await dealer_node1.send(f"{TOPIC}/A", data[0])
await asyncio.sleep(.2)
await asyncio.sleep(1)
while data:
await dealer_node1.send(f"{TOPIC}/A", data.pop())
assert await wait_for_result(
lambda: len(res) == 4 and len(result) == 4,
timeout=1
)


@pytest.mark.asyncio
@pytest.mark.parametrize("dealer_node1,dealer_node2",
[({'server': True, 'utf8_decoding': False},
{'server': False, 'utf8_decoding': False})],
indirect=["dealer_node1", "dealer_node2"])
async def test_dealer_reps_bytes(dealer_node1, dealer_node2, result):
res = []

assert len(res) == 4
assert len(data) == 0
async def __process(req):
res.append(req.payload)
dealer_node2.register_handler(f"{TOPIC}/#", __process)

result.clear()
async with dealer_node1, dealer_node2:
await dealer_node2.send(f"{TOPIC}/A", 'XXX')
assert await wait_for_result(
lambda: len(res) == 1 and isinstance(res[0], bytes) and
len(result) == 1 and isinstance(result[0], bytes),
timeout=1
)
Loading

0 comments on commit 1a056a9

Please sign in to comment.