Skip to content

Commit

Permalink
Add GetMapSetV2 command (#372)
Browse files Browse the repository at this point in the history
Co-authored-by: Robert Resch <[email protected]>
  • Loading branch information
MVladislav and edenhaus authored Feb 15, 2024
1 parent 8f75ebf commit 3433f28
Show file tree
Hide file tree
Showing 9 changed files with 362 additions and 57 deletions.
3 changes: 3 additions & 0 deletions deebot_client/commands/json/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
GetCachedMapInfo,
GetMajorMap,
GetMapSet,
GetMapSetV2,
GetMapSubSet,
GetMapTrace,
GetMinorMap,
Expand Down Expand Up @@ -73,6 +74,7 @@
"GetCachedMapInfo",
"GetMajorMap",
"GetMapSet",
"GetMapSetV2",
"GetMapSubSet",
"GetMapTrace",
"GetMinorMap",
Expand Down Expand Up @@ -144,6 +146,7 @@
GetCachedMapInfo,
GetMajorMap,
GetMapSet,
GetMapSetV2,
GetMapSubSet,
GetMapTrace,
GetMinorMap,
Expand Down
108 changes: 94 additions & 14 deletions deebot_client/commands/json/map.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Maps commands."""
from __future__ import annotations

import json
from types import MappingProxyType
from typing import TYPE_CHECKING, Any

Expand All @@ -15,6 +16,7 @@
)
from deebot_client.events.map import CachedMapInfoEvent
from deebot_client.message import HandlingResult, HandlingState, MessageBodyDataDict
from deebot_client.util import decompress_7z_base64_data

from .common import JsonCommandWithMessageHandling

Expand All @@ -26,6 +28,22 @@ class GetCachedMapInfo(JsonCommandWithMessageHandling, MessageBodyDataDict):
"""Get cached map info command."""

name = "getCachedMapInfo"
# version definition for using type of getMapSet v1 or v2
_map_set_command: type[GetMapSet | GetMapSetV2]

def __init__(
self, args: dict[str, Any] | list[Any] | None = None, version: int = 1
) -> None:
match version:
case 1:
self._map_set_command = GetMapSet
case 2:
self._map_set_command = GetMapSetV2
case _:
error_wrong_version = f"version={version} is not supported"
raise ValueError(error_wrong_version)

super().__init__(args)

@classmethod
def _handle_body_data_dict(
Expand Down Expand Up @@ -59,7 +77,10 @@ def _handle_response(
return CommandResult(
result.state,
result.args,
[GetMapSet(result.args["map_id"], entry) for entry in MapSetType],
[
self._map_set_command(result.args["map_id"], entry)
for entry in MapSetType
],
)

return result
Expand Down Expand Up @@ -131,16 +152,24 @@ def _handle_body_data_dict(
:return: A message response
"""
subsets = [int(subset["mssid"]) for subset in data["subsets"]]
args = {
cls._ARGS_ID: data["mid"],
cls._ARGS_SET_ID: data.get("msid", None),
cls._ARGS_TYPE: data["type"],
cls._ARGS_SUBSETS: subsets,
}
if not MapSetType.has_value(data["type"]) or not data.get("subsets"):
return HandlingResult.analyse()

event_bus.notify(MapSetEvent(MapSetType(data["type"]), subsets))
return HandlingResult(HandlingState.SUCCESS, args)
if subset_ids := cls._get_subset_ids(event_bus, data):
event_bus.notify(MapSetEvent(MapSetType(data["type"]), subset_ids))
args = {
cls._ARGS_ID: data["mid"],
cls._ARGS_SET_ID: data.get("msid"),
cls._ARGS_TYPE: data["type"],
cls._ARGS_SUBSETS: subset_ids,
}
return HandlingResult(HandlingState.SUCCESS, args)
return HandlingResult(HandlingState.SUCCESS)

@classmethod
def _get_subset_ids(cls, _: EventBus, data: dict[str, Any]) -> list[int] | None:
"""Return subset ids."""
return [int(subset["mssid"]) for subset in data["subsets"]]

def _handle_response(
self, event_bus: EventBus, response: dict[str, Any]
Expand Down Expand Up @@ -205,7 +234,8 @@ def __init__(
type = type.value

if msid is None and type == MapSetType.ROOMS.value:
raise ValueError("msid is required when type='vw'")
error_msid_type = f"msid is required when type='{MapSetType.ROOMS.value}'"
raise ValueError(error_msid_type)

super().__init__(
{
Expand All @@ -225,18 +255,26 @@ def _handle_body_data_dict(
:return: A message response
"""
if MapSetType.has_value(data["type"]):
subtype = data.get("subtype", data.get("subType", None))
subtype = data.get("subtype", data.get("subType"))
name = None
if subtype == "15":
name = data.get("name", None)
name = data.get("name")
elif subtype:
name = cls._ROOM_NUM_TO_NAME.get(subtype, None)

# This command is used by new and old bots
if data.get("compress", 0) == 1:
# Newer bot's return coordinates as base64 decoded string
coordinates = decompress_7z_base64_data(data["value"]).decode()
else:
# Older bot's return coordinates direct as comma/semicolon separated list
coordinates = data["value"]

event_bus.notify(
MapSubsetEvent(
id=int(data["mssid"]),
type=MapSetType(data["type"]),
coordinates=data["value"],
coordinates=coordinates,
name=name,
)
)
Expand All @@ -246,6 +284,48 @@ def _handle_body_data_dict(
return HandlingResult.analyse()


class GetMapSetV2(GetMapSet):
"""Get map set v2 command."""

name = "getMapSet_V2"

@classmethod
def _get_subset_ids(
cls, event_bus: EventBus, data: dict[str, Any]
) -> list[int] | None:
"""Return subset ids."""
# subset is based64 7z compressed
subsets = json.loads(decompress_7z_base64_data(data["subsets"]).decode())

match data["type"]:
case MapSetType.ROOMS:
# subset values
# 1 -> id
# 2 -> unknown
# 3 -> unknown
# 4 -> room clean order
# 5 -> room center x
# 6 -> room center y
# 7 -> room clean configs as '<count>-<speed>-<water>'
# 8 -> named all as 'settingName1'
return [int(subset[0]) for subset in subsets]

case MapSetType.VIRTUAL_WALLS | MapSetType.NO_MOP_ZONES:
for subset in subsets:
mssid = subset[0] # first entry in list is mssid
coordinates = str(subset[1:]) # all other in list are coordinates

event_bus.notify(
MapSubsetEvent(
id=int(mssid),
type=MapSetType(data["type"]),
coordinates=coordinates,
)
)

return None


class GetMapTrace(JsonCommandWithMessageHandling, MessageBodyDataDict):
"""Get map trace command."""

Expand Down
8 changes: 4 additions & 4 deletions deebot_client/hardware/deebot/p95mgv.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@
from deebot_client.commands.json.error import GetError
from deebot_client.commands.json.fan_speed import GetFanSpeed, SetFanSpeed
from deebot_client.commands.json.life_span import GetLifeSpan, ResetLifeSpan

# getMapSet
from deebot_client.commands.json.map import (
GetCachedMapInfo,
GetMajorMap,
Expand Down Expand Up @@ -160,15 +158,17 @@
reset=ResetLifeSpan,
),
map=CapabilityMap(
chached_info=CapabilityEvent(CachedMapInfoEvent, [GetCachedMapInfo()]),
chached_info=CapabilityEvent(
CachedMapInfoEvent, [GetCachedMapInfo(version=2)]
),
changed=CapabilityEvent(MapChangedEvent, []),
major=CapabilityEvent(MajorMapEvent, [GetMajorMap()]),
multi_state=CapabilitySetEnable(
MultimapStateEvent, [GetMultimapState()], SetMultimapState
),
position=CapabilityEvent(PositionsEvent, [GetPos()]),
relocation=CapabilityExecute(SetRelocationState),
rooms=CapabilityEvent(RoomsEvent, [GetCachedMapInfo()]),
rooms=CapabilityEvent(RoomsEvent, [GetCachedMapInfo(version=2)]),
trace=CapabilityEvent(MapTraceEvent, [GetMapTrace()]),
),
network=CapabilityEvent(NetworkInfoEvent, [GetNetInfo()]),
Expand Down
36 changes: 9 additions & 27 deletions deebot_client/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from decimal import Decimal
from io import BytesIO
import itertools
import lzma
import struct
from typing import TYPE_CHECKING, Any, Final
import zlib
Expand All @@ -35,7 +34,11 @@
from .exceptions import MapError
from .logging_filter import get_logger
from .models import Room
from .util import OnChangedDict, OnChangedList
from .util import (
OnChangedDict,
OnChangedList,
decompress_7z_base64_data,
)

if TYPE_CHECKING:
from collections.abc import Callable, Coroutine, Sequence
Expand Down Expand Up @@ -117,7 +120,7 @@ class _PositionSvg:
_TRACE_MAP = "trace_map"
_COLORS = {
_TRACE_MAP: "#fff",
MapSetType.VIRTUAL_WALLS: "#f00",
MapSetType.VIRTUAL_WALLS: "#f00000",
MapSetType.NO_MOP_ZONES: "#ffa500",
}
_DEFAULT_MAP_BACKGROUND_COLOR = ImageColor.getrgb("#badaff") # floor
Expand Down Expand Up @@ -233,27 +236,6 @@ class BackgroundImage:
)


def _decompress_7z_base64_data(data: str) -> bytes:
_LOGGER.debug("[decompress7zBase64Data] Begin")
final_array = bytearray()

# Decode Base64
decoded = base64.b64decode(data)

i = 0
for idx in decoded:
if i == 8:
final_array += b"\x00\x00\x00\x00"
final_array.append(idx)
i += 1

dec = lzma.LZMADecompressor(lzma.FORMAT_AUTO, None, None)
decompressed_data = dec.decompress(final_array)

_LOGGER.debug("[decompress7zBase64Data] Done")
return decompressed_data


def _calc_value(value: float, axis_manipulation: AxisManipulation) -> float:
try:
if value is not None:
Expand Down Expand Up @@ -352,7 +334,7 @@ def _get_svg_subset(

# For any other points count, return a polygon that should fit any required shape
return svg.Polygon(
fill=_COLORS[subset.type] + "90", # Set alpha channel to 90 for fill color
fill=_COLORS[subset.type] + "30", # Set alpha channel to 30 for fill color
stroke=_COLORS[subset.type],
stroke_width=1.5,
stroke_dasharray=[4],
Expand Down Expand Up @@ -432,7 +414,7 @@ async def on_map_subset(event: MapSubsetEvent) -> None:

def _update_trace_points(self, data: str) -> None:
_LOGGER.debug("[_update_trace_points] Begin")
trace_points = _decompress_7z_base64_data(data)
trace_points = decompress_7z_base64_data(data)

for i in range(0, len(trace_points), 5):
position_x, position_y = struct.unpack("<hh", trace_points[i : i + 4])
Expand Down Expand Up @@ -683,7 +665,7 @@ def image(self) -> Image.Image:

def update_points(self, base64_data: str) -> None:
"""Add map piece points."""
decoded = _decompress_7z_base64_data(base64_data)
decoded = decompress_7z_base64_data(base64_data)
old_crc32 = self._crc32
self._crc32 = zlib.crc32(decoded)

Expand Down
8 changes: 7 additions & 1 deletion deebot_client/messages/json/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,23 @@
from typing import TYPE_CHECKING

from .battery import OnBattery
from .map import OnMapSetV2
from .stats import ReportStats

if TYPE_CHECKING:
from deebot_client.message import Message

__all__ = ["OnBattery", "ReportStats"]
__all__ = [
"OnBattery",
"OnMapSetV2",
"ReportStats",
]

# fmt: off
# ordered by file asc
_MESSAGES: list[type[Message]] = [
OnBattery,
OnMapSetV2,

ReportStats
]
Expand Down
35 changes: 35 additions & 0 deletions deebot_client/messages/json/map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""Map set v2 messages."""
from __future__ import annotations

from typing import TYPE_CHECKING, Any

from deebot_client.events.map import MapSetType
from deebot_client.message import HandlingResult, HandlingState, MessageBodyDataDict

if TYPE_CHECKING:
from deebot_client.event_bus import EventBus


class OnMapSetV2(MessageBodyDataDict):
"""On map set v2 message."""

name = "onMapSet_V2"

@classmethod
def _handle_body_data_dict(
cls, _: EventBus, data: dict[str, Any]
) -> HandlingResult:
"""Handle message->body->data and notify the correct event subscribers.
:return: A message response
"""
# check if type is know and mid us given
if not MapSetType.has_value(data["type"]) or not data.get("mid"):
return HandlingResult.analyse()

# NOTE: here would be needed to call 'GetMapSetV2' again with 'mid' and 'type',
# that on event will update the map set changes,
# messages current cannot call commands again
return HandlingResult(
HandlingState.SUCCESS, {"mid": data["mid"], "type": data["type"]}
)
Loading

0 comments on commit 3433f28

Please sign in to comment.