Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python38 and refactor tests to use pytest-asyncio #18

Merged
merged 14 commits into from
Mar 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.6, 3.7]
python-version: [3.6, 3.7, 3.8, 3.9]

steps:
- uses: actions/checkout@v2
Expand All @@ -26,7 +26,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install flake8 pytest
python -m pip install flake8 pytest pytest-asyncio
python -m pip install .
- name: Lint with flake8
run: |
Expand Down
6 changes: 4 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
dist: trusty
dist: xenial
sudo: false
language: python

python:
- "3.4"
- "3.5"
- "3.6"
- "3.7"
- "3.8"
- "3.9"
- "nightly"
matrix:
allow_failures:
Expand Down
5 changes: 2 additions & 3 deletions docs/references/broker.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ The following example shows how to start a broker using the default configuratio
from hbmqtt.broker import Broker


@asyncio.coroutine
def broker_coro():
async def broker_coro():
broker = Broker()
yield from broker.start()
await broker.start()


if __name__ == '__main__':
Expand Down
35 changes: 16 additions & 19 deletions docs/references/mqttclient.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,22 @@ The example below shows how to write a simple MQTT client which subscribes a top

logger = logging.getLogger(__name__)

@asyncio.coroutine
def uptime_coro():
async def uptime_coro():
C = MQTTClient()
yield from C.connect('mqtt://test.mosquitto.org/')
await C.connect('mqtt://test.mosquitto.org/')
# Subscribe to '$SYS/broker/uptime' with QOS=1
# Subscribe to '$SYS/broker/load/#' with QOS=2
yield from C.subscribe([
await C.subscribe([
('$SYS/broker/uptime', QOS_1),
('$SYS/broker/load/#', QOS_2),
])
try:
for i in range(1, 100):
message = yield from C.deliver_message()
message = await C.deliver_message()
packet = message.publish_packet
print("%d: %s => %s" % (i, packet.variable_header.topic_name, str(packet.payload.data)))
yield from C.unsubscribe(['$SYS/broker/uptime', '$SYS/broker/load/#'])
yield from C.disconnect()
await C.unsubscribe(['$SYS/broker/uptime', '$SYS/broker/load/#'])
await C.disconnect()
except ClientException as ce:
logger.error("Client exception: %s" % ce)

Expand Down Expand Up @@ -71,31 +70,29 @@ This example also shows to method for publishing message asynchronously.

logger = logging.getLogger(__name__)

@asyncio.coroutine
def test_coro():
async def test_coro():
C = MQTTClient()
yield from C.connect('mqtt://test.mosquitto.org/')
await C.connect('mqtt://test.mosquitto.org/')
tasks = [
asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_0')),
asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=QOS_1)),
asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_2', qos=QOS_2)),
]
yield from asyncio.wait(tasks)
await asyncio.wait(tasks)
logger.info("messages published")
yield from C.disconnect()
await C.disconnect()


@asyncio.coroutine
def test_coro2():
async def test_coro2():
try:
C = MQTTClient()
ret = yield from C.connect('mqtt://test.mosquitto.org:1883/')
message = yield from C.publish('a/b', b'TEST MESSAGE WITH QOS_0', qos=QOS_0)
message = yield from C.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=QOS_1)
message = yield from C.publish('a/b', b'TEST MESSAGE WITH QOS_2', qos=QOS_2)
ret = await C.connect('mqtt://test.mosquitto.org:1883/')
message = await C.publish('a/b', b'TEST MESSAGE WITH QOS_0', qos=QOS_0)
message = await C.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=QOS_1)
message = await C.publish('a/b', b'TEST MESSAGE WITH QOS_2', qos=QOS_2)
#print(message)
logger.info("messages published")
yield from C.disconnect()
await C.disconnect()
except ConnectException as ce:
logger.error("Connection failed: %s" % ce)
asyncio.get_event_loop().stop()
Expand Down
57 changes: 22 additions & 35 deletions hbmqtt/adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ class ReaderAdapter:
Reader adapters are used to adapt read operations on the network depending on the protocol used
"""

@asyncio.coroutine
def read(self, n=-1) -> bytes:
async def read(self, n=-1) -> bytes:
"""
Read up to n bytes. If n is not provided, or set to -1, read until EOF and return all read bytes.
If the EOF was received and the internal buffer is empty, return an empty bytes object.
Expand All @@ -42,8 +41,7 @@ def write(self, data):
write some data to the protocol layer
"""

@asyncio.coroutine
def drain(self):
async def drain(self):
"""
Let the write buffer of the underlying transport a chance to be flushed.
"""
Expand All @@ -53,8 +51,7 @@ def get_peer_info(self):
Return peer socket info (remote address and remote port as tuple
"""

@asyncio.coroutine
def close(self):
async def close(self):
"""
Close the protocol connection
"""
Expand All @@ -69,22 +66,20 @@ def __init__(self, protocol: WebSocketCommonProtocol):
self._protocol = protocol
self._stream = io.BytesIO(b'')

@asyncio.coroutine
def read(self, n=-1) -> bytes:
yield from self._feed_buffer(n)
async def read(self, n=-1) -> bytes:
await self._feed_buffer(n)
data = self._stream.read(n)
return data

@asyncio.coroutine
def _feed_buffer(self, n=1):
async def _feed_buffer(self, n=1):
"""
Feed the data buffer by reading a Websocket message.
:param n: if given, feed buffer until it contains at least n bytes
"""
buffer = bytearray(self._stream.read())
while len(buffer) < n:
try:
message = yield from self._protocol.recv()
message = await self._protocol.recv()
except ConnectionClosed:
message = None
if message is None:
Expand All @@ -110,22 +105,20 @@ def write(self, data):
"""
self._stream.write(data)

@asyncio.coroutine
def drain(self):
async def drain(self):
"""
Let the write buffer of the underlying transport a chance to be flushed.
"""
data = self._stream.getvalue()
if len(data):
yield from self._protocol.send(data)
await self._protocol.send(data)
self._stream = io.BytesIO(b'')

def get_peer_info(self):
return self._protocol.remote_address

@asyncio.coroutine
def close(self):
yield from self._protocol.close()
async def close(self):
await self._protocol.close()


class StreamReaderAdapter(ReaderAdapter):
Expand All @@ -137,12 +130,11 @@ class StreamReaderAdapter(ReaderAdapter):
def __init__(self, reader: StreamReader):
self._reader = reader

@asyncio.coroutine
def read(self, n=-1) -> bytes:
async def read(self, n=-1) -> bytes:
if n == -1:
data = yield from self._reader.read(n)
data = await self._reader.read(n)
else:
data = yield from self._reader.readexactly(n)
data = await self._reader.readexactly(n)
return data

def feed_eof(self):
Expand All @@ -164,24 +156,22 @@ def write(self, data):
if not self.is_closed:
self._writer.write(data)

@asyncio.coroutine
def drain(self):
async def drain(self):
if not self.is_closed:
yield from self._writer.drain()
await self._writer.drain()

def get_peer_info(self):
extra_info = self._writer.get_extra_info('peername')
return extra_info[0], extra_info[1]

@asyncio.coroutine
def close(self):
async def close(self):
if not self.is_closed:
self.is_closed = True # we first mark this closed so yields below don't cause races with waiting writes
yield from self._writer.drain()
await self._writer.drain()
if self._writer.can_write_eof():
self._writer.write_eof()
self._writer.close()
try: yield from self._writer.wait_closed() # py37+
try: await self._writer.wait_closed() # py37+
except AttributeError: pass


Expand All @@ -193,8 +183,7 @@ class BufferReader(ReaderAdapter):
def __init__(self, buffer: bytes):
self._stream = io.BytesIO(buffer)

@asyncio.coroutine
def read(self, n=-1) -> bytes:
async def read(self, n=-1) -> bytes:
return self._stream.read(n)


Expand All @@ -212,8 +201,7 @@ def write(self, data):
"""
self._stream.write(data)

@asyncio.coroutine
def drain(self):
async def drain(self):
pass

def get_buffer(self):
Expand All @@ -222,6 +210,5 @@ def get_buffer(self):
def get_peer_info(self):
return "BufferWriter", 0

@asyncio.coroutine
def close(self):
async def close(self):
self._stream.close()
Loading