Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

utf8_decoding & tests #15

Merged
merged 6 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[flake8]
max-line-length = 80
ignore =
E701
E701, E131, W504
exclude =
.git,
.venv,
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,4 @@ jobs:

- name: Run pytest
shell: bash
run: python -m pytest -p no:sugar tests/
run: python -m pytest -p no:sugar --log-cli-level=DEBUG --capture=tee-sys tests
104 changes: 77 additions & 27 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,58 +29,109 @@ from zmq_tubes.threads import TubeNode, Tube # Threads classes
## Usage:

### Node definitions in yml file
We can define all tubes for one TubeNode by yml file.

We can define all tubes for one TubeNode by yml file.
Next examples require install these packages `PyYAML`, `pyzmq` and `zmq_tubes`.
#### Client service (asyncio example)
```yaml
# test.yml
# client.yml
tubes:
- name: Client REQ
addr: ipc:///tmp/req.pipe
addr: ipc:///tmp/req.pipe
tube_type: REQ
topics:
- foo/#
- +/bar
- foo/bar

- name: Client PUB
addr: ipc:///tmp/pub.pipe
addr: ipc:///tmp/pub.pipe
tube_type: PUB
topics:
- foo/pub/#
```

```python
# client.py
import asyncio
import yaml
from zmq_tubes import TubeNode, TubeMessage


async def run():
with open('client.yml', 'r+') as fd:
schema = yaml.safe_load(fd)
node = TubeNode(schema=schema)
async with node:
print(await node.request('foo/bar', 'message 1'))
await node.publish('foo/pub/test', 'message 2')

- name: Server ROUTER
addr: ipc:///tmp/router.pipe
if __name__ == "__main__":
asyncio.run(run())
```
```shell
> python client.py
topic: foo/bar, payload: response
```


#### Server service (threads example)
```yaml
# server.yml
tubes:
- name: server ROUTER
addr: ipc:///tmp/req.pipe
tube_type: ROUTER
server: yes
sockopts:
LINGER: 0
server: True
topics:
- foo/bar

- name: server SUB
addr: ipc:///tmp/pub.pipe
tube_type: SUB
server: True
topics:
- server/#
- foo/pub/#
```

```python
import asyncio
# server.py
import yaml
from zmq_tubes import TubeNode, TubeMessage
from zmq_tubes.threads import TubeNode, TubeMessage


async def handler(request: TubeMessage):
def handler(request: TubeMessage):
print(request.payload)
return request.create_response('response')
if request.tube.tube_type_name == 'ROUTER':
return request.create_response('response')


async def run():
with open('test.yml', 'r+') as fd:
def run():
with open('server.yml', 'r+') as fd:
schema = yaml.safe_load(fd)
node = TubeNode(schema=schema)
node.register_handler('server/#', handler)
async with node:
await node.publish('foo/pub/test', 'message 1')
print(await node.request('foo/xxx', 'message 2'))
node.register_handler('foo/#', handler)
with node:
node.start().join()

if __name__ == "__main__":
run()
```

asyncio.run(run())
```shell
> python server.py
message 1
message 2
```

### YAML definition

The yaml file starts with a root element `tubes`, which contains list of all our tube definitions.
- `name` - string - name of the tube.
- `addr` - string - connection or bind address in format `transport://address` (see more http://api.zeromq.org/2-1:zmq-connect)
- `server` - bool - is this tube server side (bind to `addr`) or client side (connect to `addr`)
- `tube_type` - string - type of this tube (see more https://zguide.zeromq.org/docs/chapter2/#Messaging-Patterns)
- `identity` - string - (optional) we can setup custom tube identity
- `utf8_decoding` - bool - (default = True), if this is True, the payload is automatically UTF8 decode.
- `sockopts` - dict - (optional) we can setup sockopts for this tube (see more http://api.zeromq.org/4-2:zmq-setsockopt)
- `monitor` - string - (optional) bind address of tube monitor (see more [Debugging / Monitoring](#debugging-/-monitoring))


### Request / Response
Expand Down Expand Up @@ -113,7 +164,6 @@ await node.start()
```

#### Client:

```python
from zmq_tubes import Tube, TubeNode

Expand All @@ -129,8 +179,8 @@ response = await node.request('test/xxx', 'question')
print(response.payload)
# output: 'answer'
```


The method `request` accepts the optional parameter `utf8_decoding`. When we set this parameter to `False` in previous
example, the returned payload is not automatically decoded, we get bytes.


### Subscribe / Publisher
Expand Down
90 changes: 61 additions & 29 deletions tests/async/test_dealer_dealer.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import sys

import asyncio

import zmq
import pytest

from tests.helpers import wait_for_result
from zmq_tubes import Tube, TubeNode

pytestmark = pytest.mark.skipif(sys.version_info < (3, 7),
Expand All @@ -16,20 +15,28 @@

@pytest.fixture
def data():
return ['REQ10', 'REQ11', 'REQ20', 'REQ21']
return ['REQ10', 'REQ11', 'REQ20', 'REQ21'].copy()


@pytest.fixture
def result():
return []


@pytest.fixture(params=[{'server': True}])
def dealer_node1(request, data):
@pytest.fixture(params=[{'server': True, 'utf8_decoding': True}])
def dealer_node1(request, result):
async def __process(req):
data.remove(req.payload)
await req.tube.send(req.create_response(f'DEALER1-{req.payload[-2:]}'))
result.append(req.payload)
await req.tube.send(req.create_response(
f'DEALER1{req.payload[-2:]}' if request.param['utf8_decoding']
else b'DEALER1' + req.payload[-2:]))

tube = Tube(
name='DEALER1',
addr=ADDR,
server=request.param['server'],
tube_type=zmq.DEALER
tube_type=zmq.DEALER,
utf8_decoding=request.param['utf8_decoding']
)

node = TubeNode()
Expand All @@ -38,48 +45,52 @@ async def __process(req):
return node


@pytest.fixture(params=[{'server': False}])
@pytest.fixture(params=[{'server': False, 'utf8_decoding': True}])
def dealer_node2(request):
tube = Tube(
name='DEALER2',
addr=ADDR,
server=request.param['server'],
tube_type=zmq.DEALER
tube_type=zmq.DEALER,
utf8_decoding=request.param['utf8_decoding']
)

node = TubeNode()
node.register_tube(tube, f"{TOPIC}/#")
return node


################################################################################
# Tests
################################################################################


@pytest.mark.asyncio
async def test_dealer_dealer(dealer_node1, dealer_node2, data):
async def test_dealer_dealer(dealer_node1, dealer_node2, data, result):
res = []

async def __process(req):
res.append(req.payload)

dealer_node2.register_handler(f"{TOPIC}/#", __process)

result.clear()
async with dealer_node1, dealer_node2:
for _ in range(len(data)):
await dealer_node2.send(f"{TOPIC}/A", data[0])
await asyncio.sleep(.2)
await asyncio.sleep(1)

assert len(res) == 4
assert len(data) == 0
while data:
await dealer_node2.send(f"{TOPIC}/A", data.pop())
assert await wait_for_result(
lambda: len(res) == 4 and len(result) == 4,
timeout=1
)


@pytest.mark.asyncio
async def test_dealer_dealer_on_same_node(dealer_node1, data):
async def test_dealer_dealer_on_same_node(dealer_node1, data, result):
res = []

async def __process(req):
res.append(req.payload)

result.clear()
tube = Tube(
name='DEALER1',
name='DEALER2',
addr=ADDR,
server=False,
tube_type=zmq.DEALER
Expand All @@ -89,10 +100,31 @@ async def __process(req):
dealer_node1.register_handler(f"{TOPIC}/#", __process, tube)

async with dealer_node1:
for _ in range(len(data)):
await dealer_node1.send(f"{TOPIC}/A", data[0])
await asyncio.sleep(.2)
await asyncio.sleep(1)
while data:
await dealer_node1.send(f"{TOPIC}/A", data.pop())
assert await wait_for_result(
lambda: len(res) == 4 and len(result) == 4,
timeout=1
)


@pytest.mark.asyncio
@pytest.mark.parametrize("dealer_node1,dealer_node2",
[({'server': True, 'utf8_decoding': False},
{'server': False, 'utf8_decoding': False})],
indirect=["dealer_node1", "dealer_node2"])
async def test_dealer_reps_bytes(dealer_node1, dealer_node2, result):
res = []

assert len(res) == 4
assert len(data) == 0
async def __process(req):
res.append(req.payload)
dealer_node2.register_handler(f"{TOPIC}/#", __process)

result.clear()
async with dealer_node1, dealer_node2:
await dealer_node2.send(f"{TOPIC}/A", 'XXX')
assert await wait_for_result(
lambda: len(res) == 1 and isinstance(res[0], bytes) and
len(result) == 1 and isinstance(result[0], bytes),
timeout=1
)
Loading