Skip to content

Commit

Permalink
Add mypy
Browse files Browse the repository at this point in the history
Signed-off-by: Joostlek <[email protected]>
  • Loading branch information
joostlek committed Feb 21, 2024
1 parent a34a808 commit 99fcf05
Show file tree
Hide file tree
Showing 16 changed files with 346 additions and 121 deletions.
36 changes: 36 additions & 0 deletions .github/workflows/typing.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
---
name: Typing

on:
push:
branches:
- main
pull_request:
workflow_dispatch:

env:
DEFAULT_PYTHON: "3.11"

jobs:
mypy:
name: mypy
runs-on: ubuntu-latest
steps:
- name: ⤵️ Check out code from GitHub
uses: actions/[email protected]
- name: 🏗 Set up Poetry
run: pipx install poetry
- name: 🏗 Set up Python ${{ env.DEFAULT_PYTHON }}
id: python
uses: actions/[email protected]
with:
python-version: ${{ env.DEFAULT_PYTHON }}
cache: "poetry"
- name: 🏗 Install workflow dependencies
run: |
poetry config virtualenvs.create true
poetry config virtualenvs.in-project true
- name: 🏗 Install dependencies
run: poetry install --no-interaction
- name: 🚀 Run mypy
run: poetry run mypy src tests
60 changes: 59 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 35 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,43 @@ pydantic = ">=1"
pytest = "^8.0"
pytest-asyncio = "^0.23"
ruff = "^0.2.2"
mypy = "^1.8.0"

[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"


[tool.mypy]
# Specify the target platform details in config, so your developers are
# free to run mypy on Windows, Linux, or macOS and get consistent
# results.
platform = "linux"
python_version = "3.10"

# show error messages from unrelated files
follow_imports = "normal"

# suppress errors about unsatisfied imports
ignore_missing_imports = true

# be strict
check_untyped_defs = true
disallow_any_generics = true
disallow_incomplete_defs = true
disallow_subclassing_any = true
disallow_untyped_calls = true
disallow_untyped_decorators = true
disallow_untyped_defs = true
no_implicit_optional = true
strict_optional = true
warn_incomplete_stub = true
warn_no_return = true
warn_redundant_casts = true
warn_return_any = true
warn_unused_configs = true
warn_unused_ignores = true

[tool.ruff]
line-length = 80

Expand All @@ -61,6 +93,7 @@ ignore = [
"DTZ005",
"EM101",
"EM102",
"FBT001",
"FBT002",
"FBT003",
"G002",
Expand All @@ -76,6 +109,7 @@ ignore = [
"D102", # documentation info, should be removed after fixes
"D103", # documentation info, should be removed after fixes
"PERF102",
"PGH003",
"PLE1205",
"PLR2004", # Just annoying, not really useful
"PLR0912",
Expand All @@ -84,6 +118,7 @@ ignore = [
"RET507",
"SIM102",
"SLF001",
"S101",
"T201",
"UP007",
]
Expand Down
24 changes: 13 additions & 11 deletions roombapy/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,22 @@ class RoombaDiscovery:
udp_port = 5678
roomba_message = "irobotmcs"
amount_of_broadcasted_messages = 5
server_socket = None
log = None
server_socket: socket.socket
log: logging.Logger

def __init__(self):
def __init__(self) -> None:
"""Init discovery."""
self.server_socket = _get_socket()
self.log = logging.getLogger(__name__)

def find(self, ip=None):
def find(
self, ip: str | None = None
) -> Optional[RoombaInfo] | set[RoombaInfo]:
if ip is not None:
return self.get(ip)
return self.get_all()

def get_all(self):
def get_all(self) -> set[RoombaInfo]:
self._start_server()
self._broadcast_message(self.amount_of_broadcasted_messages)
robots = set()
Expand All @@ -40,12 +42,12 @@ def get_all(self):
break
return robots

def get(self, ip):
def get(self, ip: str) -> Optional[RoombaInfo]:
self._start_server()
self._send_message(ip)
return self._get_response(ip)

def _get_response(self, ip=None):
def _get_response(self, ip: str | None = None) -> Optional[RoombaInfo]:
try:
while True:
raw_response, addr = self.server_socket.recvfrom(1024)
Expand All @@ -63,20 +65,20 @@ def _get_response(self, ip=None):
self.log.info("Socket timeout")
return None

def _broadcast_message(self, amount):
def _broadcast_message(self, amount: int) -> None:
for i in range(amount):
self.server_socket.sendto(
self.roomba_message.encode(), (self.udp_address, self.udp_port)
)
self.log.debug("Broadcast message sent: %s", i)

def _send_message(self, udp_address):
def _send_message(self, udp_address: str) -> None:
self.server_socket.sendto(
self.roomba_message.encode(), (udp_address, self.udp_port)
)
self.log.debug("Message sent")

def _start_server(self):
def _start_server(self) -> None:
self.server_socket.bind((self.udp_bind_address, self.udp_port))
self.log.debug("Socket server started, port %s", self.udp_port)

Expand All @@ -99,7 +101,7 @@ def _decode_data(raw_response: bytes) -> Optional[RoombaInfo]:
return None


def _get_socket():
def _get_socket() -> socket.socket:
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
server_socket.settimeout(5)
Expand Down
57 changes: 38 additions & 19 deletions roombapy/entry_points.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,50 @@
from __future__ import annotations

import sys

from roombapy import RoombaFactory
from roombapy import RoombaFactory, RoombaInfo
from roombapy.discovery import RoombaDiscovery
from roombapy.getpassword import RoombaPassword


def discovery():
def discovery() -> None:
roomba_ip = _get_ip_from_arg()

roomba_discovery = RoombaDiscovery()
if roomba_ip is not None:
print(roomba_discovery.find(roomba_ip))
return

robots_info = roomba_discovery.find()
for robot in robots_info:
print(robot)
if isinstance(robots_info, set):
for robot in robots_info:
print(robot)
else:
print(robots_info)


def password():
def password() -> None:
roomba_ip = _get_ip_from_arg()
_validate_ip(roomba_ip)
_wait_for_input()

roomba_discovery = RoombaDiscovery()
roomba_info = roomba_discovery.find(roomba_ip)
assert roomba_info is not None
_validate_roomba_info(roomba_info)

roomba_password = RoombaPassword(roomba_ip)
info = roomba_info
if isinstance(roomba_info, set):
info = next(iter(roomba_info))

assert isinstance(info, RoombaInfo)

roomba_password = RoombaPassword(info.ip)
found_password = roomba_password.get_password()
roomba_info.password = found_password
print(roomba_info)
info.password = found_password
print(info)


def connect():
def connect() -> None:
roomba_ip = _get_ip_from_arg()
_validate_ip(roomba_ip)

Expand All @@ -42,34 +53,42 @@ def connect():

roomba_discovery = RoombaDiscovery()
roomba_info = roomba_discovery.find(roomba_ip)
assert roomba_info is not None
_validate_roomba_info(roomba_info)

roomba = RoombaFactory.create_roomba(
roomba_info.ip, roomba_info.blid, roomba_password
)
info = roomba_info
if isinstance(roomba_info, set):
info = next(iter(roomba_info))

assert isinstance(info, RoombaInfo)
assert roomba_password

roomba = RoombaFactory.create_roomba(info.ip, info.blid, roomba_password)
roomba.register_on_message_callback(lambda msg: print(msg))
roomba.connect()

while True:
pass


def _validate_ip(ip):
def _validate_ip(ip: str | None) -> None:
if ip is None:
raise Exception("ip cannot be null")


def _validate_password(ip):
def _validate_password(ip: str | None) -> None:
if ip is None:
raise Exception("password cannot be null")


def _validate_roomba_info(roomba_info):
def _validate_roomba_info(
roomba_info: RoombaInfo | set[RoombaInfo] | None,
) -> None:
if roomba_info is None:
raise Exception("cannot find roomba")


def _wait_for_input():
def _wait_for_input() -> None:
print(
"Roomba have to be on Home Base powered on.\n"
"Press and hold HOME button until you hear series of tones.\n"
Expand All @@ -78,13 +97,13 @@ def _wait_for_input():
input("Press Enter to continue...")


def _get_ip_from_arg():
def _get_ip_from_arg() -> str | None:
if len(sys.argv) < 2:
return None
return str(sys.argv[1])


def _get_password_from_arg():
def _get_password_from_arg() -> str | None:
if len(sys.argv) < 3:
return None
return str(sys.argv[2])
Loading

0 comments on commit 99fcf05

Please sign in to comment.