Skip to content

Commit

Permalink
qingping_cgdk2: refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
devbis committed May 13, 2023
1 parent 4c31854 commit 933ea1b
Showing 1 changed file with 29 additions and 50 deletions.
79 changes: 29 additions & 50 deletions ble2mqtt/devices/qingping_cgdk2.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,21 @@
import io
import logging
import struct
import typing as ty
import uuid
from dataclasses import dataclass

from bleak.backends.device import BLEDevice

from ..protocols.xiaomi import parse_fe95_advert
from ..utils import format_binary
from .base import ConnectionMode
from .uuids import BATTERY
from .xiaomi_base import XiaomiHumidityTemperature

_LOGGER = logging.getLogger(__name__)

SERVICE_DATA_UUID = 'fdcd'

MAC_START = 2
MAC_END = 8
SERVICE_DATA_UUID = uuid.UUID('0000fdcd-0000-1000-8000-00805f9b34fb')


@dataclass
class SensorState:
battery: int = 0
temperature: float = 0
humidity: float = 0
mac: str = ""
temperature: float = 0.0
humidity: float = 0.0


class QingpingTempRHMonitorLite(XiaomiHumidityTemperature):
Expand All @@ -38,39 +27,29 @@ class QingpingTempRHMonitorLite(XiaomiHumidityTemperature):
SUPPORT_PASSIVE = True
ACTIVE_CONNECTION_MODE = ConnectionMode.ACTIVE_POLL_WITH_DISCONNECT

def filter_notifications(self, sender):
return True

def handle_advert(self, scanned_device: BLEDevice, adv_data):
service_data = adv_data.service_data
adv_data = service_data.get(str("0000fdcd-0000-1000-8000-00805f9b34fb"))
if adv_data is None :
return

data_str = format_binary(adv_data," ")

# adv_data = bytes(adv_data)

preeamble = "cdfd88"
packetStart = data_str.find(preeamble)
offset = packetStart + len(preeamble)
strippedData_str = data_str[6:len(data_str)]
mac = ':'.join(list(reversed(strippedData_str[0:17].split(' '))))
dataIdentifier = data_str[(offset-2):offset].upper()

if dataIdentifier == "10":
data = io.BytesIO(adv_data)
data.seek(10)
temperature = int(format(struct.unpack("<H", data.read(2))[0], '02x'), base=16) /10
data.seek(12)
humidity = int(format(struct.unpack("<H", data.read(2))[0], '02x'), base=16) /10
data.seek(16)
batteryPercent = int(format_binary(data.read(2)),base=16)
self._state = self.SENSOR_CLASS(
temperature=temperature,
humidity=humidity,
battery=batteryPercent,
mac=mac
)


PREAMBLE = b'\xcd\xfd'

def filter_notifications(self, sender, data):
packet_start = data.find(self.PREAMBLE)
if packet_start == -1:
return False
return data[packet_start + len(self.PREAMBLE) + 1] == 0x10

def process_data(self, data):
packet_start = data.find(self.PREAMBLE)
offset = packet_start + len(self.PREAMBLE)
value_data = data[offset:]

self._state = self.SENSOR_CLASS(
temperature=int.from_bytes(
value_data[10:12],
byteorder='little',
signed=True,
) / 10,
humidity=int.from_bytes(
value_data[12:14],
byteorder='little',
signed=False,
) / 10,
battery=value_data[16],
)

0 comments on commit 933ea1b

Please sign in to comment.