Skip to content

Commit

Permalink
Moving code from home assistant into the backing lib for connectivity…
Browse files Browse the repository at this point in the history
… checking
  • Loading branch information
jeeftor committed Dec 11, 2023
1 parent 7708b36 commit 33c4982
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 12 deletions.
60 changes: 48 additions & 12 deletions example_cloud_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
import logging
import os

from httpx import Cookies
from httpx import ConnectError
from rich import print
from rich.logging import RichHandler

from intellifire4py import UnifiedFireplace
from intellifire4py.cloud_interface import IntelliFireCloudInterface

FORMAT = "%(message)s"
Expand Down Expand Up @@ -47,26 +48,61 @@ def get_creds() -> tuple[str, str]:
return (username, password)


async def _async_validate_connectivity(
fireplace: UnifiedFireplace, timeout=600
) -> tuple[bool, bool]:
"""Check local and cloud connectivity."""

async def with_timeout(coroutine):
try:
await asyncio.wait_for(coroutine, timeout)
return True
except asyncio.TimeoutError:
return False
except ConnectError:
return False
except Exception as err: # pylint: disable=broad-except
print(err)
return False

local_future = with_timeout(fireplace.perform_local_poll())
cloud_future = with_timeout(fireplace.perform_cloud_poll())

local_success, cloud_success = await asyncio.gather(local_future, cloud_future)
return local_success, cloud_success


async def main() -> None:
"""Define main function."""
username, password = get_creds()

cloud_api_interface = IntelliFireCloudInterface(use_http=True, verify_ssl=False)
cloud_api_interface = (
IntelliFireCloudInterface()
) # use_http=True, verify_ssl=False)

await cloud_api_interface.login_with_cookie(cookie=Cookies())
# user_json: str = os.getenv("USER_JSON") # type: ignore
#
# cloud_api_interface.load_user_data(json_str=user_json)
# await cloud_api_interface.login_with_cookie(cookie=Cookies())
user_json: str = os.getenv("USER_JSON") # type: ignore
#
# # await cloud_api_interface.login(username=username, password=password)
cloud_api_interface.load_user_data(json_str=user_json)

# await cloud_api_interface.login_with_credentials(
# username=username, password=password
# )
#
# user_data = cloud_api_interface.user_data
user_data = cloud_api_interface.user_data

# print(user_data.model_dump_json(indent=2))
#
# fireplaces: UnifiedFireplace = (
# await UnifiedFireplace.build_fireplaces_from_user_data(user_data)
# )
# fireplace = fireplaces[0]
fireplaces: UnifiedFireplace = (
await UnifiedFireplace.build_fireplaces_from_user_data(user_data)
)
fireplace = fireplaces[0]

# Tweak Local IP

local, cloud = await _async_validate_connectivity(fireplace)

print(local, cloud)
#
# await fireplace.set_read_mode(IntelliFireApiMode.LOCAL)
# await fireplace.set_control_mode(IntelliFireApiMode.CLOUD)
Expand Down
3 changes: 3 additions & 0 deletions src/intellifire4py/local_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import logging

import httpx
from httpx import ConnectError

from intellifire4py.model import (
IntelliFirePollData,
Expand Down Expand Up @@ -206,6 +207,8 @@ async def poll(self, suppress_warnings: bool = False) -> None:
self._log.warning("Timeout error on polling")
except httpx.ReadTimeout:
self._log.warning(f"Timeout on reading {url}")
except ConnectError as err:
raise ConnectionError from err

async def send_command(
self,
Expand Down
59 changes: 59 additions & 0 deletions src/intellifire4py/unified_fireplace.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import asyncio
import logging

from httpx import ConnectError

from intellifire4py import IntelliFireAPILocal, IntelliFireAPICloud
from intellifire4py.const import IntelliFireApiMode
from intellifire4py.control import IntelliFireController
Expand All @@ -17,6 +19,8 @@
from intellifire4py.read import IntelliFireDataProvider

from typing import cast
from typing import Any
from collections.abc import Coroutine


class UnifiedFireplace:
Expand All @@ -38,6 +42,9 @@ class UnifiedFireplace:
_local_api: IntelliFireAPILocal
_cloud_api: IntelliFireAPICloud

cloud_connectivity: bool | None = None
local_connectivity: bool | None = None

def __init__(
self,
fireplace_data: IntelliFireCommonFireplaceData,
Expand Down Expand Up @@ -504,3 +511,55 @@ class instance, including its methods and properties. It is especially useful
for debugging purposes to understand the state of an object in a rich, readable format.
"""
inspect(self, methods=True, help=True)

async def async_validate_connectivity(
self, timeout: int = 600
) -> tuple[bool, bool]:
"""Asynchronously validate connectivity for both local and cloud services.
This function checks the connectivity status for local and cloud services
by awaiting on two asynchronous tasks. Each task has a specified timeout,
after which it is considered unsuccessful.
Parameters:
timeout (int): The maximum time in seconds to wait for each connectivity check.
Returns:
tuple[bool, bool]: A tuple containing two boolean values. The first boolean
indicates the success of the local connectivity check,
and the second indicates the success of the cloud connectivity check.
"""

async def with_timeout(coroutine: Coroutine[Any, Any, Any]) -> bool:
"""Helper function to run a coroutine with a timeout.
If the coroutine does not complete within the specified timeout,
or if a ConnectError is raised, it returns False. Otherwise, it returns True.
Parameters:
coroutine: The coroutine to be executed with a timeout.
Returns:
bool: True if the coroutine completes successfully within the timeout, False otherwise.
"""
try:
await asyncio.wait_for(coroutine, timeout)
return True
except asyncio.TimeoutError:
return False
except ConnectError:
return False

# Initiate asynchronous connectivity checks for local and cloud.
local_future = with_timeout(self.perform_local_poll())
cloud_future = with_timeout(self.perform_cloud_poll())

# Await the completion of both connectivity checks.
local_success, cloud_success = await asyncio.gather(local_future, cloud_future)

# Update instance variables with the results of the connectivity checks.
self.cloud_connectivity = cloud_success
self.local_connectivity = local_success

# Return the results of the connectivity checks.
return local_success, cloud_success

0 comments on commit 33c4982

Please sign in to comment.