-
Notifications
You must be signed in to change notification settings - Fork 117
Refactor user tokens, introduce Logfire client #981
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
59d509c
b349cc0
5b0a8c3
adad73e
48eb4d0
cf0860f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,37 +1,189 @@ | ||
from __future__ import annotations | ||
|
||
import platform | ||
import re | ||
import sys | ||
import warnings | ||
from dataclasses import dataclass | ||
from datetime import datetime, timezone | ||
from pathlib import Path | ||
from typing import TypedDict | ||
from typing import TypedDict, cast | ||
from urllib.parse import urljoin | ||
|
||
if sys.version_info >= (3, 9): | ||
from functools import cache | ||
else: | ||
from functools import lru_cache | ||
|
||
cache = lru_cache(maxsize=None) | ||
|
||
import requests | ||
from rich.prompt import IntPrompt | ||
from typing_extensions import Self | ||
|
||
from logfire.exceptions import LogfireConfigError | ||
|
||
from .utils import UnexpectedResponse | ||
from .utils import UnexpectedResponse, read_toml_file | ||
|
||
HOME_LOGFIRE = Path.home() / '.logfire' | ||
"""Folder used to store global configuration, and user tokens.""" | ||
DEFAULT_FILE = HOME_LOGFIRE / 'default.toml' | ||
"""File used to store user tokens.""" | ||
|
||
|
||
PYDANTIC_LOGFIRE_TOKEN_PATTERN = re.compile( | ||
r'^(?P<safe_part>pylf_v(?P<version>[0-9]+)_(?P<region>[a-z]+)_)(?P<token>[a-zA-Z0-9]+)$' | ||
) | ||
|
||
|
||
class _RegionData(TypedDict): | ||
base_url: str | ||
gcp_region: str | ||
|
||
|
||
REGIONS: dict[str, _RegionData] = { | ||
'us': { | ||
'base_url': 'https://logfire-us.pydantic.dev', | ||
'gcp_region': 'us-east4', | ||
}, | ||
'eu': { | ||
'base_url': 'https://logfire-eu.pydantic.dev', | ||
'gcp_region': 'europe-west4', | ||
}, | ||
} | ||
"""The existing Logfire regions.""" | ||
|
||
|
||
class UserTokenData(TypedDict): | ||
"""User token data.""" | ||
|
||
token: str | ||
expiration: str | ||
|
||
|
||
class DefaultFile(TypedDict): | ||
"""Content of the default.toml file.""" | ||
class UserTokensFileData(TypedDict): | ||
"""Content of the file containing the user tokens.""" | ||
|
||
tokens: dict[str, UserTokenData] | ||
|
||
|
||
@dataclass | ||
class UserToken: | ||
"""A user token.""" | ||
|
||
token: str | ||
base_url: str | ||
expiration: str | ||
|
||
@classmethod | ||
def from_user_token_data(cls, base_url: str, token: UserTokenData) -> Self: | ||
return cls( | ||
token=token['token'], | ||
base_url=base_url, | ||
expiration=token['expiration'], | ||
) | ||
|
||
@property | ||
def is_expired(self) -> bool: | ||
return datetime.now(tz=timezone.utc) >= datetime.fromisoformat(self.expiration.rstrip('Z')).replace( | ||
tzinfo=timezone.utc | ||
) | ||
|
||
def __str__(self) -> str: | ||
# TODO define in this module? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think so, yes |
||
from .config import PYDANTIC_LOGFIRE_TOKEN_PATTERN, REGIONS | ||
|
||
region = 'us' | ||
if match := PYDANTIC_LOGFIRE_TOKEN_PATTERN.match(self.token): | ||
region = match.group('region') | ||
if region not in REGIONS: | ||
region = 'us' | ||
Comment on lines
+94
to
+100
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think this should be a separate method/function |
||
|
||
token_repr = f'{region.upper()} ({self.base_url}) - ' | ||
if match: | ||
token_repr += match.group('safe_part') + match.group('token')[:5] | ||
else: | ||
token_repr += self.token[:5] | ||
token_repr += '****' | ||
return token_repr | ||
|
||
|
||
@dataclass | ||
class UserTokenCollection: | ||
"""A collection of user tokens.""" | ||
|
||
user_tokens: dict[str, UserToken] | ||
"""A mapping between base URLs and user tokens.""" | ||
|
||
@classmethod | ||
def empty(cls) -> Self: | ||
"""Create an empty user token collection.""" | ||
return cls(user_tokens={}) | ||
|
||
@classmethod | ||
def from_file_data(cls, file_data: UserTokensFileData) -> Self: | ||
return cls(user_tokens={url: UserToken(base_url=url, **data) for url, data in file_data['tokens'].items()}) | ||
|
||
@classmethod | ||
def from_tokens_file(cls, file: Path) -> Self: | ||
return cls.from_file_data(cast(UserTokensFileData, read_toml_file(file))) | ||
|
||
def get_token(self, base_url: str | None = None) -> UserToken: | ||
tokens_list = list(self.user_tokens.values()) | ||
if base_url is not None: | ||
token = next((t for t in tokens_list if t.base_url == base_url), None) | ||
if token is None: | ||
raise LogfireConfigError( | ||
f'No user token was found matching the {base_url} Logfire URL. ' | ||
'Please run `logfire auth` to authenticate.' | ||
) | ||
else: | ||
if len(tokens_list) == 1: | ||
token = tokens_list[0] | ||
elif len(tokens_list) >= 2: | ||
choices_str = '\n'.join( | ||
f'{i}. {token} ({"expired" if token.is_expired else "valid"})' | ||
for i, token in enumerate(tokens_list, start=1) | ||
) | ||
int_choice = IntPrompt.ask( | ||
f'Multiple user tokens found. Please select one:\n{choices_str}\n', | ||
choices=[str(i) for i in range(1, len(tokens_list) + 1)], | ||
) | ||
token = tokens_list[int_choice - 1] | ||
else: # tokens_list == [] | ||
raise LogfireConfigError('No user tokens are available. Please run `logfire auth` to authenticate.') | ||
|
||
if token.is_expired: | ||
raise LogfireConfigError(f'User token {token} is expired. Pleas run `logfire auth` to authenticate.') | ||
return token | ||
|
||
def is_logged_in(self, base_url: str | None = None) -> bool: | ||
if base_url is not None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
tokens = (t for t in self.user_tokens.values() if t.base_url == base_url) | ||
else: | ||
tokens = self.user_tokens.values() | ||
return any(not t.is_expired for t in tokens) | ||
|
||
def add_token(self, base_url: str, token: UserTokenData) -> UserToken: | ||
user_token = UserToken.from_user_token_data(base_url, token) | ||
self.user_tokens[base_url] = UserToken.from_user_token_data(base_url, token) | ||
return user_token | ||
|
||
def dump(self, path: Path) -> None: | ||
# There's no standard library package to write TOML files, so we'll write it manually. | ||
with path.open('w') as f: | ||
for base_url, user_token in self.user_tokens.items(): | ||
f.write(f'[tokens."{base_url}"]\n') | ||
f.write(f'token = "{user_token.token}"\n') | ||
f.write(f'expiration = "{user_token.expiration}"\n') | ||
|
||
|
||
@cache | ||
def default_token_collection() -> UserTokenCollection: | ||
"""The default token collection, created from the `~/.logfire/default.toml` file.""" | ||
return UserTokenCollection.from_tokens_file(DEFAULT_FILE) | ||
|
||
|
||
class NewDeviceFlow(TypedDict): | ||
"""Matches model of the same name in the backend.""" | ||
|
||
|
@@ -91,17 +243,3 @@ def poll_for_token(session: requests.Session, device_code: str, base_api_url: st | |
opt_user_token: UserTokenData | None = res.json() | ||
if opt_user_token: | ||
return opt_user_token | ||
|
||
|
||
def is_logged_in(data: DefaultFile, logfire_url: str) -> bool: | ||
"""Check if the user is logged in. | ||
|
||
Returns: | ||
True if the user is logged in, False otherwise. | ||
""" | ||
for url, info in data['tokens'].items(): # pragma: no branch | ||
# token expirations are in UTC | ||
expiry_date = datetime.fromisoformat(info['expiration'].rstrip('Z')).replace(tzinfo=timezone.utc) | ||
if url == logfire_url and datetime.now(tz=timezone.utc) < expiry_date: # pragma: no branch | ||
return True | ||
return False # pragma: no cover |
Uh oh!
There was an error while loading. Please reload this page.