Skip to content

Commit

Permalink
Use PosixPath for path handling and create parent folders if they not…
Browse files Browse the repository at this point in the history
… exist (#271)

* Use PosixPath for path handling and create parent folders if they not exist
  • Loading branch information
CM000n authored Jan 5, 2024
1 parent 6e651ed commit b178acc
Showing 1 changed file with 21 additions and 20 deletions.
41 changes: 21 additions & 20 deletions mytoyota/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
from datetime import datetime, timedelta
from http import HTTPStatus
from os.path import exists, expanduser
from pathlib import Path
from typing import Any, Dict, Optional
from urllib import parse

Expand All @@ -22,7 +22,7 @@

_LOGGER: logging.Logger = logging.getLogger(__package__)

CACHE_FILENAME: Optional[str] = expanduser("~/.cache/toyota_credentials_cache_contains_secrets")
CACHE_FILENAME: Path = Path.home() / ".cache" / "toyota_credentials_cache_contains_secrets"


# TODO There is an issue if you login with the application on a phone as all the tokens change. # noqa: E501
Expand All @@ -47,8 +47,8 @@ def __init__(self, username: str, password: str, timeout: int = 60) -> None:
self._authorize_url = httpx.URL(AUTHORIZE_URL)

# Do we have a cache file?
if CACHE_FILENAME and exists(CACHE_FILENAME):
with open(CACHE_FILENAME, "r", encoding="utf-8") as f:
if CACHE_FILENAME.exists():
with open(str(CACHE_FILENAME), "r", encoding="utf-8") as f:
cache_data = json.load(f)
if self._username == cache_data["username"]:
self._token = cache_data["access_token"]
Expand Down Expand Up @@ -160,7 +160,7 @@ async def _refresh_tokens(self) -> None:

self._update_tokens(resp.json())

def _update_tokens(self, resp: json):
def _update_tokens(self, resp: Dict[str, Any]):
access_tokens: Dict[str, Any] = resp
if (
"access_token" not in access_tokens
Expand All @@ -169,9 +169,10 @@ def _update_tokens(self, resp: json):
or "expires_in" not in access_tokens
):
raise ToyotaLoginError(
f"Token retrieval failed. Missing Tokens. {resp.status_code}, {resp.text}."
f"Token retrieval failed. Missing Tokens. \
{access_tokens.status_code}, \
{access_tokens.text}."
)

self._token = access_tokens["access_token"]
self._refresh_token = access_tokens["refresh_token"]
self._uuid = jwt.decode(
Expand All @@ -182,20 +183,20 @@ def _update_tokens(self, resp: json):
)["uuid"]
self._token_expiration = datetime.now() + timedelta(seconds=access_tokens["expires_in"])

if CACHE_FILENAME:
with open(CACHE_FILENAME, "w", encoding="utf-8") as f:
f.write(
json.dumps(
{
"access_token": self._token,
"refresh_token": self._refresh_token,
"uuid": self._uuid,
"expiration": self._token_expiration,
"username": self._username,
},
default=str,
)
CACHE_FILENAME.parent.mkdir(parents=True, exist_ok=True)
with open(str(CACHE_FILENAME), "w", encoding="utf-8") as f:
f.write(
json.dumps(
{
"access_token": self._token,
"refresh_token": self._refresh_token,
"uuid": self._uuid,
"expiration": self._token_expiration,
"username": self._username,
},
default=str,
)
)

async def request_raw( # noqa: PLR0913
self,
Expand Down

0 comments on commit b178acc

Please sign in to comment.