Skip to content

Commit

Permalink
Do not always renew access token
Browse files Browse the repository at this point in the history
added some logging infos.
  • Loading branch information
moerpel authored Jun 2, 2024
1 parent 9f3cfd6 commit 37de970
Showing 1 changed file with 29 additions and 22 deletions.
51 changes: 29 additions & 22 deletions custom_components/HA_SMA_EVCharger/evcharger_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

_LOGGER = logging.getLogger(__name__)


class EvChargerAPI:
def __init__(self, api_url, username, password):
self.api_url = api_url
Expand All @@ -13,6 +14,8 @@ def __init__(self, api_url, username, password):
self.session = requests.Session()
self.access_token = None
self.refresh_token = None
self.run_count = 0


def authenticate(self):
"""Authenticate and obtain a token for subsequent API calls."""
Expand All @@ -31,37 +34,41 @@ def authenticate(self):
_LOGGER.debug(f"Access Token: {self.access_token}")
self.refresh_token = tokens.get("refresh_token")
_LOGGER.debug(f"Refresh Token: {self.refresh_token}")

_LOGGER.info(f"Authentication with: {self.api_url}")
if not self.access_token:
raise ValueError("Authentication failed, no token received")

# Update session headers with the received token
self.session.headers.update({"Authorization": f"Bearer {self.access_token}"})

def get_data(self):

token_url = f"{self.api_url}/api/v1/token"
auth_payload = {
'grant_type': 'password',
'username': self.username,
'password': self.password
}
response = self.session.post(token_url, data=auth_payload)
response.raise_for_status() # Raises an HTTPError if the HTTP request returned an unsuccessful status code
self.run_count +=1
if self.run_count > 20:
token_url = f"{self.api_url}/api/v1/token"
auth_payload = {
'grant_type': 'password',
'username': self.username,
'password': self.password
}
response = self.session.post(token_url, data=auth_payload)
response.raise_for_status() # Raises an HTTPError if the HTTP request returned an unsuccessful status code

tokens = response.json()
self.access_token = tokens.get("access_token")
#_LOGGER.debug(f"Access Token: {self.access_token}")
self.refresh_token = tokens.get("refresh_token")
#_LOGGER.debug(f"Refresh Token: {self.refresh_token}")

if not self.access_token:
raise ValueError("Authentication failed, no token received")
self.run_count = 0
_LOGGER.debug(f"Reauthenticated with access_token: {self.access_token}")

# Update session headers with the received token
self.session.headers.update({"Authorization": f"Bearer {self.access_token}"})

tokens = response.json()
self.access_token = tokens.get("access_token")
_LOGGER.debug(f"Access Token: {self.access_token}")
self.refresh_token = tokens.get("refresh_token")
_LOGGER.debug(f"Refresh Token: {self.refresh_token}")

if not self.access_token:
raise ValueError("Authentication failed, no token received")

# Update session headers with the received token
self.session.headers.update({"Authorization": f"Bearer {self.access_token}"})

"""Fetch the data from the API."""
_LOGGER.info(f"Fetching data from: {self.api_url}")
measurements_url = f"{self.api_url}/api/v1/measurements/live/"
payload = json.dumps([{"componentId": "IGULD:SELF"}])

Expand Down

0 comments on commit 37de970

Please sign in to comment.