Skip to content

Commit

Permalink
Refactor event stream (#13)
Browse files Browse the repository at this point in the history
* Refactor event stream to use only one

* Remove commented-out code

* Change code as requested
  • Loading branch information
BraveChicken1 authored Mar 9, 2022
1 parent 926b9bf commit 0bbf191
Showing 1 changed file with 67 additions and 31 deletions.
98 changes: 67 additions & 31 deletions homeconnect/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ def __init__(
self.redirect_uri = redirect_uri
self.token_updater = token_updater

self._appliances = {}
self.listening_events = False

extra = {"client_id": self.client_id, "client_secret": self.client_secret}

self._oauth = OAuth2Session(
Expand Down Expand Up @@ -127,8 +130,22 @@ def delete(self, endpoint):
def get_appliances(self):
"""Return a list of `HomeConnectAppliance` instances for all
appliances."""

appliances = {}

data = self.get(ENDPOINT_APPLIANCES)
return [HomeConnectAppliance(self, **app) for app in data["homeappliances"]]
for home_appliance in data["homeappliances"]:
haId = home_appliance["haId"]

if haId in self._appliances:
appliances[haId] = self._appliances[haId]
appliances[haId].connected = home_appliance["connected"]
continue

appliances[haId] = HomeConnectAppliance(self, **home_appliance)

self._appliances = appliances
return list(self._appliances.values())

def get_authurl(self):
"""Get the URL needed for the authorization code grant flow."""
Expand All @@ -137,6 +154,49 @@ def get_authurl(self):
)
return authorization_url

def listen_events(self):
"""Spawn a thread with an event listener that updates the status."""
self.listening_events = True
uri = f"{self.host}/api/homeappliances/events"
sse = SSEClient(uri, session=self._oauth, retry=1000, timeout=TIMEOUT_S)
Thread(target=self._listen, args=[sse]).start()

def _listen(self, sse):
"""Worker function for listener."""
LOGGER.info("Listening to event stream for all devices")
try:
for event in sse:
try:
for appliance in self._appliances.values():
if appliance.haId == event.id:
self.handle_event(event, appliance)
break
except ValueError:
pass
except TokenExpiredError:
LOGGER.info("Token expired in event stream.")
self._oauth.token = self.refresh_tokens()
uri = f"{self.host}/api/homeappliances/events"
sse = SSEClient(uri, session=self._oauth, retry=1000, timeout=TIMEOUT_S)
self._listen(sse)

def handle_event(self, event, appliance):
"""Handle a new event.
Updates the status with the event data and executes any callback
function."""
event_data = json.loads(event.data)
d = self.json2dict(event_data["items"])
appliance.status.update(d)
if appliance.event_callback is not None:
appliance.event_callback(appliance)

@staticmethod
def json2dict(lst):
"""Turn a list of dictionaries where one key is called 'key'
into a dictionary with the value of 'key' as key."""
return {d.pop("key"): d for d in lst}


class HomeConnect(HomeConnectAPI):
"""Connection to the HomeConnect OAuth API."""
Expand Down Expand Up @@ -212,6 +272,8 @@ def __init__(
self.connected = connected
self.status = {}

self.event_callback = None

def __repr__(self):
return "HomeConnectAppliance(hc, haId='{}', vib='{}', brand='{}', type='{}', name='{}', enumber='{}', connected={})".format(
self.haId,
Expand All @@ -224,44 +286,18 @@ def __repr__(self):
)

def listen_events(self, callback=None):
"""Spawn a thread with an event listener that updates the status."""
uri = f"{self.hc.host}/api/homeappliances/{self.haId}/events"
sse = SSEClient(uri, session=self.hc._oauth, retry=1000, timeout=TIMEOUT_S)
Thread(target=self._listen, args=(sse, callback)).start()
"""Register event callback method"""
self.event_callback = callback

def _listen(self, sse, callback=None):
"""Worker function for listener."""
LOGGER.info("Listening to event stream for device %s", self.name)
try:
for event in sse:
try:
self.handle_event(event, callback)
except ValueError:
pass
except TokenExpiredError as e:
LOGGER.info("Token expired in event stream.")
self.hc._oauth.token = self.hc.refresh_tokens()
uri = f"{self.hc.host}/api/homeappliances/{self.haId}/events"
sse = SSEClient(uri, session=self.hc._oauth, retry=1000, timeout=TIMEOUT_S)
self._listen(sse, callback=callback)
if not self.hc.listening_events:
self.hc.listen_events()

@staticmethod
def json2dict(lst):
"""Turn a list of dictionaries where one key is called 'key'
into a dictionary with the value of 'key' as key."""
return {d.pop("key"): d for d in lst}

def handle_event(self, event, callback=None):
"""Handle a new event.
Updates the status with the event data and executes any callback
function."""
event = json.loads(event.data)
d = self.json2dict(event["items"])
self.status.update(d)
if callback is not None:
callback(self)

def get(self, endpoint):
"""Get data (as dictionary) from an endpoint."""
return self.hc.get("{}/{}{}".format(ENDPOINT_APPLIANCES, self.haId, endpoint))
Expand Down

0 comments on commit 0bbf191

Please sign in to comment.