From 59d509cc3add63b0e5b7027c34983a87160ed7dd Mon Sep 17 00:00:00 2001 From: Viicos <65306057+Viicos@users.noreply.github.com> Date: Thu, 3 Apr 2025 18:03:12 +0200 Subject: [PATCH 1/6] Add explicit type hint for `UnexpectedResponse.response` It is guaranteed to be set --- logfire/_internal/utils.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/logfire/_internal/utils.py b/logfire/_internal/utils.py index e3293616..99bfa34a 100644 --- a/logfire/_internal/utils.py +++ b/logfire/_internal/utils.py @@ -171,6 +171,8 @@ def span_to_dict(span: ReadableSpan) -> ReadableSpanDict: class UnexpectedResponse(RequestException): """An unexpected response was received from the server.""" + response: Response # type: ignore (guaranteed to exist) + def __init__(self, response: Response) -> None: super().__init__(f'Unexpected response: {response.status_code}', response=response) From b349cc0c3e430ebbd44fc876396e5f815aedc278 Mon Sep 17 00:00:00 2001 From: Viicos <65306057+Viicos@users.noreply.github.com> Date: Thu, 3 Apr 2025 18:05:15 +0200 Subject: [PATCH 2/6] Add basic Logfire client --- logfire/_internal/client.py | 102 ++++++++++++++++++++++++++++++++++++ 1 file changed, 102 insertions(+) create mode 100644 logfire/_internal/client.py diff --git a/logfire/_internal/client.py b/logfire/_internal/client.py new file mode 100644 index 00000000..c8317075 --- /dev/null +++ b/logfire/_internal/client.py @@ -0,0 +1,102 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any +from urllib.parse import urljoin + +from requests import Response, Session + +from logfire.exceptions import LogfireConfigError +from logfire.version import VERSION + +from .utils import UnexpectedResponse + +if TYPE_CHECKING: + from .auth import UserToken + + +UA_HEADER = f'logfire/{VERSION}' + + +class ProjectAlreadyExists(Exception): + pass + + +class InvalidProjectName(Exception): + def __init__(self, reason: str, /) -> None: + self.reason = reason + + +class LogfireClient: + def __init__(self, user_token: UserToken) -> None: + if user_token.is_expired: + raise RuntimeError + self.base_url = user_token.base_url + self._token = user_token.token + self._session = Session() + self._session.headers.update({'Authorization': self._token, 'User-Agent': UA_HEADER}) + + def _get(self, endpoint: str) -> Response: + response = self._session.get(urljoin(self.base_url, endpoint)) + UnexpectedResponse.raise_for_status(response) + return response + + def _post(self, endpoint: str, body: Any | None = None) -> Response: + response = self._session.post(urljoin(self.base_url, endpoint), json=body) + UnexpectedResponse.raise_for_status(response) + return response + + def get_user_organizations(self) -> list[dict[str, Any]]: + """Get the organizations of the logged-in user.""" + try: + response = self._get('/v1/organizations') + except UnexpectedResponse as e: + raise LogfireConfigError('Error retrieving list of organizations') from e + return response.json() + + def get_user_information(self) -> dict[str, Any]: + """Get information about the logged-in user.""" + try: + response = self._get('/v1/account/me') + except UnexpectedResponse as e: + raise LogfireConfigError('Error retrieving user information') from e + return response.json() + + def get_user_projects(self) -> list[dict[str, Any]]: + """Get the projects of the logged-in user.""" + try: + response = self._get('/v1/projects') + except UnexpectedResponse as e: # pragma: no cover + raise LogfireConfigError('Error retrieving list of projects') from e + return response.json() + + def create_new_project(self, organization: str, project_name: str): + """Create a new project. + + Args: + organization: The organization that should hold the new project. + project_name: The name of the project to be created. + + Returns: + The newly created project. + """ + try: + response = self._post(f'/v1/projects/{organization}', body={'project_name': project_name}) + except UnexpectedResponse as e: + r = e.response + if r.status_code == 409: + raise ProjectAlreadyExists + if r.status_code == 422: + error = r.json()['detail'][0] + if error['loc'] == ['body', 'project_name']: # pragma: no branch + raise InvalidProjectName(error['msg']) + + raise LogfireConfigError('Error creating new project') + return response.json() + + def create_write_token(self, organization: str, project_name: str) -> dict[str, Any]: + """Create a write token for the given project in the given organization.""" + try: + response = self._post(f'/v1/organizations/{organization}/projects/{project_name}/write-tokens') + except UnexpectedResponse as e: + raise LogfireConfigError('Error creating project write token') from e + return response.json() From 5b0a8c374d982ec37e53dd532629b45e222b0902 Mon Sep 17 00:00:00 2001 From: Viicos <65306057+Viicos@users.noreply.github.com> Date: Thu, 3 Apr 2025 18:34:31 +0200 Subject: [PATCH 3/6] Add structure to represent user tokens --- logfire/_internal/auth.py | 154 +++++++++++++++++++++++++++++++++----- 1 file changed, 136 insertions(+), 18 deletions(-) diff --git a/logfire/_internal/auth.py b/logfire/_internal/auth.py index 96a01454..1b6e18be 100644 --- a/logfire/_internal/auth.py +++ b/logfire/_internal/auth.py @@ -1,17 +1,28 @@ from __future__ import annotations import platform +import sys import warnings +from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path -from typing import TypedDict +from typing import TypedDict, cast from urllib.parse import urljoin +if sys.version_info >= (3, 9): + from functools import cache +else: + from functools import lru_cache + + cache = lru_cache(maxsize=None) + import requests +from rich.prompt import IntPrompt +from typing_extensions import Self from logfire.exceptions import LogfireConfigError -from .utils import UnexpectedResponse +from .utils import UnexpectedResponse, read_toml_file HOME_LOGFIRE = Path.home() / '.logfire' """Folder used to store global configuration, and user tokens.""" @@ -26,12 +37,133 @@ class UserTokenData(TypedDict): expiration: str -class DefaultFile(TypedDict): - """Content of the default.toml file.""" +class TokensFileData(TypedDict): + """Content of the file containing the user tokens.""" tokens: dict[str, UserTokenData] +@dataclass +class UserToken: + """A user token.""" + + token: str + base_url: str + expiration: str + + @classmethod + def from_user_token_data(cls, base_url: str, token: UserTokenData) -> Self: + return cls( + token=token['token'], + base_url=base_url, + expiration=token['expiration'], + ) + + @property + def is_expired(self) -> bool: + return datetime.now(tz=timezone.utc) >= datetime.fromisoformat(self.expiration.rstrip('Z')).replace( + tzinfo=timezone.utc + ) + + def __str__(self) -> str: + # TODO define in this module? + from .config import PYDANTIC_LOGFIRE_TOKEN_PATTERN, REGIONS + + region = 'us' + if match := PYDANTIC_LOGFIRE_TOKEN_PATTERN.match(self.token): + region = match.group('region') + if region not in REGIONS: + region = 'us' + + token_repr = f'{region.upper()} ({self.base_url}) - ' + if match: + token_repr += match.group('safe_part') + match.group('token')[:5] + else: + token_repr += self.token[:5] + token_repr += '****' + return token_repr + + +@dataclass +class UserTokenCollection: + """A collection of user tokens.""" + user_tokens: list[UserToken] + + @classmethod + def from_tokens(cls, tokens: TokensFileData) -> Self: + return cls( + user_tokens=[ + UserToken.from_user_token_data(url, data) # pyright: ignore[reportArgumentType], waiting for PEP 728 + for url, data in tokens.items() + ] + ) + + @classmethod + def from_tokens_file(cls, file: Path) -> Self: + return cls.from_tokens(cast(TokensFileData, read_toml_file(file))) + + def get_token(self, base_url: str | None = None) -> UserToken: + if base_url is not None: + token = next((t for t in self.user_tokens if t.base_url == base_url), None) + if token is None: + raise LogfireConfigError( + f'No user token was found matching the {base_url} Logfire URL. ' + 'Please run `logfire auth` to authenticate.' + ) + else: + if len(self.user_tokens) == 1: + token = self.user_tokens[0] + elif len(self.user_tokens) >= 2: + choices_str = '\n'.join( + f'{i}. {token} ({"expired" if token.is_expired else "valid"})' + for i, token in enumerate(self.user_tokens, start=1) + ) + int_choice = IntPrompt.ask( + f'Multiple user tokens found. Please select one:\n{choices_str}\n', + choices=[str(i) for i in range(1, len(self.user_tokens) + 1)], + ) + token = self.user_tokens[int_choice - 1] + else: # self.user_tokens == [] + raise LogfireConfigError('No user tokens are available. Please run `logfire auth` to authenticate.') + + if token.is_expired: + raise LogfireConfigError(f'User token {token} is expired. Pleas run `logfire auth` to authenticate.') + return token + + def is_logged_in(self, base_url: str | None = None) -> bool: + if base_url is not None: + tokens = (t for t in self.user_tokens if t.base_url == base_url) + else: + tokens = self.user_tokens + return any(not t.is_expired for t in tokens) + + def add_token(self, base_url: str, token: UserTokenData) -> UserToken: + existing_token = next((t for t in self.user_tokens if t.base_url == base_url), None) + if existing_token: + token_index = self.user_tokens.index(existing_token) + self.user_tokens.remove(existing_token) + else: + token_index = len(self.user_tokens) + + user_token = UserToken.from_user_token_data(base_url, token) + self.user_tokens.insert(token_index, user_token) + return user_token + + def dump(self, path: Path) -> None: + # There's no standard library package to write TOML files, so we'll write it manually. + with path.open('w') as f: + for user_token in self.user_tokens: + f.write(f'[tokens."{user_token.base_url}"]\n') + f.write(f'token = "{user_token.token}"\n') + f.write(f'expiration = "{user_token.expiration}"\n') + + +@cache +def default_token_collection() -> UserTokenCollection: + """The default token collection, created from the `~/.logfire/default.toml` file.""" + return UserTokenCollection.from_tokens_file(DEFAULT_FILE) + + class NewDeviceFlow(TypedDict): """Matches model of the same name in the backend.""" @@ -91,17 +223,3 @@ def poll_for_token(session: requests.Session, device_code: str, base_api_url: st opt_user_token: UserTokenData | None = res.json() if opt_user_token: return opt_user_token - - -def is_logged_in(data: DefaultFile, logfire_url: str) -> bool: - """Check if the user is logged in. - - Returns: - True if the user is logged in, False otherwise. - """ - for url, info in data['tokens'].items(): # pragma: no branch - # token expirations are in UTC - expiry_date = datetime.fromisoformat(info['expiration'].rstrip('Z')).replace(tzinfo=timezone.utc) - if url == logfire_url and datetime.now(tz=timezone.utc) < expiry_date: # pragma: no branch - return True - return False # pragma: no cover From adad73e07808718f144f49739eabe3d0ce81be66 Mon Sep 17 00:00:00 2001 From: Viicos <65306057+Viicos@users.noreply.github.com> Date: Thu, 3 Apr 2025 18:35:22 +0200 Subject: [PATCH 4/6] Make use of the new client and token structure in config Most of the changes are in the `LogfireCredentials` class, where we don't pass in base URLs, tokens and sessions anymore. Any API-only related operations are defined on the new Logfire client, and `LogfireCredentials` now only has the actual functional logic (with user prompts, etc). --- logfire/_internal/config.py | 211 +++++++----------------------------- 1 file changed, 41 insertions(+), 170 deletions(-) diff --git a/logfire/_internal/config.py b/logfire/_internal/config.py index 9d360e44..a0eed307 100644 --- a/logfire/_internal/config.py +++ b/logfire/_internal/config.py @@ -13,7 +13,7 @@ from dataclasses import dataclass, field from pathlib import Path from threading import RLock, Thread -from typing import TYPE_CHECKING, Any, Callable, Literal, Sequence, TypedDict, cast +from typing import TYPE_CHECKING, Any, Callable, Literal, Sequence, TypedDict from urllib.parse import urljoin from uuid import uuid4 @@ -56,7 +56,7 @@ from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio, Sampler from opentelemetry.semconv.resource import ResourceAttributes from rich.console import Console -from rich.prompt import Confirm, IntPrompt, Prompt +from rich.prompt import Confirm, Prompt from typing_extensions import Self, Unpack from logfire.exceptions import LogfireConfigError @@ -65,7 +65,8 @@ from logfire.version import VERSION from ..propagate import NoExtractTraceContextPropagator, WarnOnExtractTraceContextPropagator -from .auth import DEFAULT_FILE, DefaultFile, is_logged_in +from .auth import default_token_collection +from .client import InvalidProjectName, LogfireClient, ProjectAlreadyExists from .config_params import ParamManager, PydanticPluginRecordValues from .constants import ( RESOURCE_ATTRIBUTES_CODE_ROOT_PATH, @@ -102,11 +103,9 @@ from .tracer import OPEN_SPANS, PendingSpanProcessor, ProxyTracerProvider from .utils import ( SeededRandomIdGenerator, - UnexpectedResponse, ensure_data_dir_exists, handle_internal_errors, platform_is_emscripten, - read_toml_file, suppress_instrumentation, ) @@ -898,10 +897,9 @@ def add_span_processor(span_processor: SpanProcessor) -> None: # if we still don't have a token, try initializing a new project and writing a new creds file # note, we only do this if `send_to_logfire` is explicitly `True`, not 'if-token-present' if self.send_to_logfire is True and credentials is None: - credentials = LogfireCredentials.initialize_project( - logfire_api_url=self.advanced.base_url, - session=requests.Session(), - ) + user_token = default_token_collection().get_token(self.advanced.base_url) + client = LogfireClient(user_token) + credentials = LogfireCredentials.initialize_project(client=client) credentials.write_creds_file(self.data_dir) if credentials is not None: @@ -1339,96 +1337,12 @@ def from_token(cls, token: str, session: requests.Session, base_url: str) -> Sel logfire_api_url=base_url, ) - @classmethod - def _get_user_token_data(cls, logfire_api_url: str | None = None) -> tuple[str, str]: - """Get a token and its associated base API URL. - - Args: - logfire_api_url: An explicitly configured base API URL. If set, the token attached - to this URL will be used. If not provided, will prompt for a token to use if multiple - ones are available, or use the only one available otherwise. - - Returns: - A two-tuple, the first element being the token and the second element being the base API URL. - """ - if DEFAULT_FILE.is_file(): - data = cast(DefaultFile, read_toml_file(DEFAULT_FILE)) - if logfire_api_url is None: - tokens_list = list(data['tokens'].items()) - if len(tokens_list) == 1: - return cls._get_user_token_data(tokens_list[0][0]) - elif len(tokens_list) >= 2: # pragma: no branch - choices_str = '\n'.join( - f'{i}. {_get_token_repr(url, d["token"])}' for i, (url, d) in enumerate(tokens_list, start=1) - ) - int_choice = IntPrompt.ask( - f'Multiple user tokens found. Please select one:\n{choices_str}\n', - choices=[str(i) for i in range(1, len(data['tokens']) + 1)], - ) - url, token_data = tokens_list[int_choice - 1] - if is_logged_in(data, url): # pragma: no branch - return token_data['token'], url - elif is_logged_in(data, logfire_api_url): - return data['tokens'][logfire_api_url]['token'], logfire_api_url - - raise LogfireConfigError( - """You are not authenticated. Please run `logfire auth` to authenticate. - -If you are running in production, you can set the `LOGFIRE_TOKEN` environment variable. -To create a write token, refer to https://logfire.pydantic.dev/docs/guides/advanced/creating_write_tokens/ -""" - ) - - @classmethod - def get_current_user(cls, session: requests.Session, logfire_api_url: str | None = None) -> dict[str, Any] | None: - try: - user_token, logfire_api_url = cls._get_user_token_data(logfire_api_url=logfire_api_url) - except LogfireConfigError: - return None - return cls._get_user_for_token(user_token, session, logfire_api_url) - - @classmethod - def _get_user_for_token(cls, user_token: str, session: requests.Session, logfire_api_url: str) -> dict[str, Any]: - headers = {**COMMON_REQUEST_HEADERS, 'Authorization': user_token} - account_info_url = urljoin(logfire_api_url, '/v1/account/me') - try: - response = session.get(account_info_url, headers=headers) - UnexpectedResponse.raise_for_status(response) - except requests.RequestException as e: - raise LogfireConfigError('Error retrieving user information.') from e - return response.json() - - @classmethod - def get_user_projects(cls, session: requests.Session, logfire_api_url: str | None = None) -> list[dict[str, Any]]: - """Get list of projects that user has access to them. - - Args: - session: HTTP client session used to communicate with the Logfire API. - logfire_api_url: The Logfire API base URL. - - Returns: - List of user projects. - - Raises: - LogfireConfigError: If there was an error retrieving user projects. - """ - user_token, logfire_api_url = cls._get_user_token_data(logfire_api_url=logfire_api_url) - headers = {**COMMON_REQUEST_HEADERS, 'Authorization': user_token} - projects_url = urljoin(logfire_api_url, '/v1/projects/') - try: - response = session.get(projects_url, headers=headers) - UnexpectedResponse.raise_for_status(response) - except requests.RequestException as e: # pragma: no cover - raise LogfireConfigError('Error retrieving list of projects.') from e - return response.json() - @classmethod def use_existing_project( cls, *, - session: requests.Session, + client: LogfireClient, projects: list[dict[str, Any]], - logfire_api_url: str | None = None, organization: str | None = None, project_name: str | None = None, ) -> dict[str, Any] | None: @@ -1438,8 +1352,7 @@ def use_existing_project( the user has access to it. Otherwise, it asks the user to select a project interactively. Args: - session: HTTP client session used to communicate with the Logfire API. - logfire_api_url: The Logfire API base URL. + client: The Logfire client to use when making requests. projects: List of user projects. organization: Project organization. project_name: Name of project that has to be used. @@ -1450,9 +1363,6 @@ def use_existing_project( Raises: LogfireConfigError: If there was an error configuring the project. """ - user_token, logfire_api_url = cls._get_user_token_data(logfire_api_url=logfire_api_url) - headers = {**COMMON_REQUEST_HEADERS, 'Authorization': user_token} - org_message = '' org_flag = '' project_message = 'projects' @@ -1520,24 +1430,13 @@ def use_existing_project( organization = project_info_tuple[0] project_name = project_info_tuple[1] - project_write_token_url = urljoin( - logfire_api_url, - f'/v1/organizations/{organization}/projects/{project_name}/write-tokens/', - ) - try: - response = session.post(project_write_token_url, headers=headers) - UnexpectedResponse.raise_for_status(response) - except requests.RequestException as e: - raise LogfireConfigError('Error creating project write token.') from e - - return response.json() + return client.create_write_token(organization, project_name) @classmethod def create_new_project( cls, *, - session: requests.Session, - logfire_api_url: str | None = None, + client: LogfireClient, organization: str | None = None, default_organization: bool = False, project_name: str | None = None, @@ -1548,8 +1447,7 @@ def create_new_project( Otherwise, it asks the user to select organization and enter a valid project name interactively. Args: - session: HTTP client session used to communicate with the Logfire API. - logfire_api_url: The Logfire API base URL. + client: The Logfire client to use when making requests. organization: The organization name of the new project. default_organization: Whether to create the project under the user default organization. project_name: The default name of the project. @@ -1560,24 +1458,15 @@ def create_new_project( Raises: LogfireConfigError: If there was an error creating projects. """ - user_token, logfire_api_url = cls._get_user_token_data(logfire_api_url=logfire_api_url) - headers = {**COMMON_REQUEST_HEADERS, 'Authorization': user_token} - - # Get user organizations - organizations_url = urljoin(logfire_api_url, '/v1/organizations/') - try: - response = session.get(organizations_url, headers=headers) - UnexpectedResponse.raise_for_status(response) - except requests.RequestException as e: - raise LogfireConfigError('Error retrieving list of organizations.') from e - organizations = [item['organization_name'] for item in response.json()] + organizations: list[str] = [item['organization_name'] for item in client.get_user_organizations()] if organization not in organizations: if len(organizations) > 1: # Get user default organization - user_details = cls._get_user_for_token(user_token, session, logfire_api_url) - assert user_details is not None - user_default_organization_name = user_details.get('default_organization', {}).get('organization_name') + user_details = client.get_user_information() + user_default_organization_name: str | None = user_details.get('default_organization', {}).get( + 'organization_name' + ) if default_organization and user_default_organization_name: organization = user_default_organization_name @@ -1586,7 +1475,7 @@ def create_new_project( '\nTo create and use a new project, please provide the following information:\n' 'Select the organization to create the project in', choices=organizations, - default=user_default_organization_name if user_default_organization_name else organizations[0], + default=user_default_organization_name or organizations[0], ) else: organization = organizations[0] @@ -1597,7 +1486,7 @@ def create_new_project( if not confirm: sys.exit(1) - project_name_default: str | None = default_project_name() + project_name_default: str = default_project_name() project_name_prompt = 'Enter the project name' while True: project_name = project_name or Prompt.ask(project_name_prompt, default=project_name_default) @@ -1611,46 +1500,35 @@ def create_new_project( default=project_name_default, ) - url = urljoin(logfire_api_url, f'/v1/projects/{organization}') try: - response = session.post(url, headers=headers, json={'project_name': project_name}) - if response.status_code == 409: - project_name_default = ... # type: ignore # this means the value is required - project_name_prompt = ( - f"\nA project with the name '{project_name}' already exists." - f' Please enter a different project name' - ) - project_name = None - continue - if response.status_code == 422: - error = response.json()['detail'][0] - if error['loc'] == ['body', 'project_name']: # pragma: no branch - project_name_default = ... # type: ignore # this means the value is required - project_name_prompt = ( - f'\nThe project name you entered is invalid:\n' - f'{error["msg"]}\n' - f'Please enter a different project name' - ) - project_name = None - continue - UnexpectedResponse.raise_for_status(response) - except requests.RequestException as e: - raise LogfireConfigError('Error creating new project.') from e + project = client.create_new_project(organization, project_name) + except ProjectAlreadyExists: + project_name_default = ... # type: ignore # this means the value is required + project_name_prompt = ( + f"\nA project with the name '{project_name}' already exists. Please enter a different project name" + ) + project_name = None + continue + except InvalidProjectName as exc: + project_name_default = ... # type: ignore # this means the value is required + project_name_prompt = ( + f'\nThe project name you entered is invalid:\n{exc.reason}\nPlease enter a different project name' + ) + project_name = None + continue else: - return response.json() + return project @classmethod def initialize_project( cls, *, - session: requests.Session, - logfire_api_url: str | None = None, + client: LogfireClient, ) -> Self: """Create a new project or use an existing project on logfire.dev requesting the given project name. Args: - session: HTTP client session used to communicate with the Logfire API. - logfire_api_url: The Logfire API base URL. + client: The Logfire client to use when making requests. Returns: The new credentials. @@ -1665,24 +1543,17 @@ def initialize_project( 'All data sent to Logfire must be associated with a project.\n' ) - _, logfire_api_url = cls._get_user_token_data(logfire_api_url=logfire_api_url) - - projects = cls.get_user_projects(session=session, logfire_api_url=logfire_api_url) + projects = client.get_user_projects() if projects: use_existing_projects = Confirm.ask('Do you want to use one of your existing projects? ', default=True) if use_existing_projects: # pragma: no branch - credentials = cls.use_existing_project( - session=session, logfire_api_url=logfire_api_url, projects=projects - ) + credentials = cls.use_existing_project(client=client, projects=projects) if not credentials: - credentials = cls.create_new_project( - session=session, - logfire_api_url=logfire_api_url, - ) + credentials = cls.create_new_project(client=client) try: - result = cls(**credentials, logfire_api_url=logfire_api_url) + result = cls(**credentials, logfire_api_url=client.base_url) Prompt.ask( f'Project initialized successfully. You will be able to view it at: {result.project_url}\n' 'Press Enter to continue' From 48eb4d0df8f4e048e77090ff4ec4defdac420a23 Mon Sep 17 00:00:00 2001 From: Viicos <65306057+Viicos@users.noreply.github.com> Date: Thu, 3 Apr 2025 18:37:41 +0200 Subject: [PATCH 5/6] Make use of the new client and token structure in the CLI --- logfire/_internal/cli.py | 68 +++++++++++++++++++++------------------- 1 file changed, 35 insertions(+), 33 deletions(-) diff --git a/logfire/_internal/cli.py b/logfire/_internal/cli.py index 89be77d1..716b5b8f 100644 --- a/logfire/_internal/cli.py +++ b/logfire/_internal/cli.py @@ -13,7 +13,7 @@ import webbrowser from operator import itemgetter from pathlib import Path -from typing import Any, Sequence, cast +from typing import Any, Sequence from urllib.parse import urlparse import requests @@ -23,11 +23,18 @@ from logfire.propagate import ContextCarrier, get_context from ..version import VERSION -from .auth import DEFAULT_FILE, HOME_LOGFIRE, DefaultFile, is_logged_in, poll_for_token, request_device_code +from .auth import ( + DEFAULT_FILE, + HOME_LOGFIRE, + UserTokenCollection, + default_token_collection, + poll_for_token, + request_device_code, +) +from .client import LogfireClient from .config import REGIONS, LogfireCredentials, get_base_url_from_token from .config_params import ParamManager from .tracer import SDKTracerProvider -from .utils import read_toml_file BASE_OTEL_INTEGRATION_URL = 'https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/' BASE_DOCS_URL = 'https://logfire.pydantic.dev/docs' @@ -197,16 +204,14 @@ def parse_auth(args: argparse.Namespace) -> None: This will authenticate your machine with Logfire and store the credentials. """ - logfire_url = args.logfire_url + logfire_url: str | None = args.logfire_url + if DEFAULT_FILE.is_file(): - data = cast(DefaultFile, read_toml_file(DEFAULT_FILE)) + tokens_collection = default_token_collection() else: - data: DefaultFile = {'tokens': {}} + tokens_collection = UserTokenCollection.from_tokens({'tokens': {}}) - if logfire_url: - logged_in = is_logged_in(data, logfire_url) - else: - logged_in = any(is_logged_in(data, url) for url in data['tokens']) + logged_in = tokens_collection.is_logged_in(logfire_url) if logged_in: sys.stderr.writelines( @@ -255,22 +260,20 @@ def parse_auth(args: argparse.Namespace) -> None: ) ) - data['tokens'][logfire_url] = poll_for_token(args._session, device_code, logfire_url) + tokens_collection.add_token(logfire_url, poll_for_token(args._session, device_code, logfire_url)) sys.stderr.write('Successfully authenticated!\n') - # There's no standard library package to write TOML files, so we'll write it manually. - with DEFAULT_FILE.open('w') as f: - for url, info in data['tokens'].items(): - f.write(f'[tokens."{url}"]\n') - f.write(f'token = "{info["token"]}"\n') - f.write(f'expiration = "{info["expiration"]}"\n') - + tokens_collection.dump(DEFAULT_FILE) sys.stderr.write(f'\nYour Logfire credentials are stored in {DEFAULT_FILE}\n') def parse_list_projects(args: argparse.Namespace) -> None: """List user projects.""" - projects = LogfireCredentials.get_user_projects(session=args._session, logfire_api_url=args.logfire_url) + logfire_url: str | None = args.logfire_url + user_token = default_token_collection().get_token(logfire_url) + client = LogfireClient(user_token) + + projects = client.get_user_projects() if projects: sys.stderr.write( _pretty_table( @@ -299,42 +302,41 @@ def _write_credentials(project_info: dict[str, Any], data_dir: Path, logfire_api def parse_create_new_project(args: argparse.Namespace) -> None: """Create a new project.""" data_dir = Path(args.data_dir) - logfire_url = args.logfire_url - if logfire_url is None: # pragma: no cover - _, logfire_url = LogfireCredentials._get_user_token_data() # type: ignore + logfire_url: str | None = args.logfire_url + user_token = default_token_collection().get_token(logfire_url) + client = LogfireClient(user_token) + project_name = args.project_name organization = args.org default_organization = args.default_org project_info = LogfireCredentials.create_new_project( - session=args._session, - logfire_api_url=logfire_url, + client=client, organization=organization, default_organization=default_organization, project_name=project_name, ) - credentials = _write_credentials(project_info, data_dir, logfire_url) + credentials = _write_credentials(project_info, data_dir, user_token.base_url) sys.stderr.write(f'Project created successfully. You will be able to view it at: {credentials.project_url}\n') def parse_use_project(args: argparse.Namespace) -> None: """Use an existing project.""" data_dir = Path(args.data_dir) - logfire_url = args.logfire_url - if logfire_url is None: # pragma: no cover - _, logfire_url = LogfireCredentials._get_user_token_data() # type: ignore + logfire_url: str | None = args.logfire_url + user_token = default_token_collection().get_token(logfire_url) + client = LogfireClient(user_token) + project_name = args.project_name organization = args.org - - projects = LogfireCredentials.get_user_projects(session=args._session, logfire_api_url=logfire_url) + projects = client.get_user_projects() project_info = LogfireCredentials.use_existing_project( - session=args._session, - logfire_api_url=logfire_url, + client=client, projects=projects, organization=organization, project_name=project_name, ) if project_info: - credentials = _write_credentials(project_info, data_dir, logfire_url) + credentials = _write_credentials(project_info, data_dir, client.base_url) sys.stderr.write( f'Project configured successfully. You will be able to view it at: {credentials.project_url}\n' ) From cf0860fae646ac20da76ebda21b5287254f24c3c Mon Sep 17 00:00:00 2001 From: Viicos <65306057+Viicos@users.noreply.github.com> Date: Wed, 30 Apr 2025 15:11:09 +0200 Subject: [PATCH 6/6] Partial feedback --- logfire/_internal/auth.py | 80 +++++++++++++++++++++++-------------- logfire/_internal/cli.py | 13 +++--- logfire/_internal/client.py | 28 ++++++++++--- logfire/_internal/config.py | 21 +--------- 4 files changed, 79 insertions(+), 63 deletions(-) diff --git a/logfire/_internal/auth.py b/logfire/_internal/auth.py index 1b6e18be..0f558208 100644 --- a/logfire/_internal/auth.py +++ b/logfire/_internal/auth.py @@ -1,6 +1,7 @@ from __future__ import annotations import platform +import re import sys import warnings from dataclasses import dataclass @@ -30,6 +31,29 @@ """File used to store user tokens.""" +PYDANTIC_LOGFIRE_TOKEN_PATTERN = re.compile( + r'^(?Ppylf_v(?P[0-9]+)_(?P[a-z]+)_)(?P[a-zA-Z0-9]+)$' +) + + +class _RegionData(TypedDict): + base_url: str + gcp_region: str + + +REGIONS: dict[str, _RegionData] = { + 'us': { + 'base_url': 'https://logfire-us.pydantic.dev', + 'gcp_region': 'us-east4', + }, + 'eu': { + 'base_url': 'https://logfire-eu.pydantic.dev', + 'gcp_region': 'europe-west4', + }, +} +"""The existing Logfire regions.""" + + class UserTokenData(TypedDict): """User token data.""" @@ -37,7 +61,7 @@ class UserTokenData(TypedDict): expiration: str -class TokensFileData(TypedDict): +class UserTokensFileData(TypedDict): """Content of the file containing the user tokens.""" tokens: dict[str, UserTokenData] @@ -87,43 +111,46 @@ def __str__(self) -> str: @dataclass class UserTokenCollection: """A collection of user tokens.""" - user_tokens: list[UserToken] + + user_tokens: dict[str, UserToken] + """A mapping between base URLs and user tokens.""" @classmethod - def from_tokens(cls, tokens: TokensFileData) -> Self: - return cls( - user_tokens=[ - UserToken.from_user_token_data(url, data) # pyright: ignore[reportArgumentType], waiting for PEP 728 - for url, data in tokens.items() - ] - ) + def empty(cls) -> Self: + """Create an empty user token collection.""" + return cls(user_tokens={}) + + @classmethod + def from_file_data(cls, file_data: UserTokensFileData) -> Self: + return cls(user_tokens={url: UserToken(base_url=url, **data) for url, data in file_data['tokens'].items()}) @classmethod def from_tokens_file(cls, file: Path) -> Self: - return cls.from_tokens(cast(TokensFileData, read_toml_file(file))) + return cls.from_file_data(cast(UserTokensFileData, read_toml_file(file))) def get_token(self, base_url: str | None = None) -> UserToken: + tokens_list = list(self.user_tokens.values()) if base_url is not None: - token = next((t for t in self.user_tokens if t.base_url == base_url), None) + token = next((t for t in tokens_list if t.base_url == base_url), None) if token is None: raise LogfireConfigError( f'No user token was found matching the {base_url} Logfire URL. ' 'Please run `logfire auth` to authenticate.' ) else: - if len(self.user_tokens) == 1: - token = self.user_tokens[0] - elif len(self.user_tokens) >= 2: + if len(tokens_list) == 1: + token = tokens_list[0] + elif len(tokens_list) >= 2: choices_str = '\n'.join( f'{i}. {token} ({"expired" if token.is_expired else "valid"})' - for i, token in enumerate(self.user_tokens, start=1) + for i, token in enumerate(tokens_list, start=1) ) int_choice = IntPrompt.ask( f'Multiple user tokens found. Please select one:\n{choices_str}\n', - choices=[str(i) for i in range(1, len(self.user_tokens) + 1)], + choices=[str(i) for i in range(1, len(tokens_list) + 1)], ) - token = self.user_tokens[int_choice - 1] - else: # self.user_tokens == [] + token = tokens_list[int_choice - 1] + else: # tokens_list == [] raise LogfireConfigError('No user tokens are available. Please run `logfire auth` to authenticate.') if token.is_expired: @@ -132,28 +159,21 @@ def get_token(self, base_url: str | None = None) -> UserToken: def is_logged_in(self, base_url: str | None = None) -> bool: if base_url is not None: - tokens = (t for t in self.user_tokens if t.base_url == base_url) + tokens = (t for t in self.user_tokens.values() if t.base_url == base_url) else: - tokens = self.user_tokens + tokens = self.user_tokens.values() return any(not t.is_expired for t in tokens) def add_token(self, base_url: str, token: UserTokenData) -> UserToken: - existing_token = next((t for t in self.user_tokens if t.base_url == base_url), None) - if existing_token: - token_index = self.user_tokens.index(existing_token) - self.user_tokens.remove(existing_token) - else: - token_index = len(self.user_tokens) - user_token = UserToken.from_user_token_data(base_url, token) - self.user_tokens.insert(token_index, user_token) + self.user_tokens[base_url] = UserToken.from_user_token_data(base_url, token) return user_token def dump(self, path: Path) -> None: # There's no standard library package to write TOML files, so we'll write it manually. with path.open('w') as f: - for user_token in self.user_tokens: - f.write(f'[tokens."{user_token.base_url}"]\n') + for base_url, user_token in self.user_tokens.items(): + f.write(f'[tokens."{base_url}"]\n') f.write(f'token = "{user_token.token}"\n') f.write(f'expiration = "{user_token.expiration}"\n') diff --git a/logfire/_internal/cli.py b/logfire/_internal/cli.py index 716b5b8f..01747537 100644 --- a/logfire/_internal/cli.py +++ b/logfire/_internal/cli.py @@ -209,7 +209,7 @@ def parse_auth(args: argparse.Namespace) -> None: if DEFAULT_FILE.is_file(): tokens_collection = default_token_collection() else: - tokens_collection = UserTokenCollection.from_tokens({'tokens': {}}) + tokens_collection = UserTokenCollection.empty() logged_in = tokens_collection.is_logged_in(logfire_url) @@ -270,8 +270,7 @@ def parse_auth(args: argparse.Namespace) -> None: def parse_list_projects(args: argparse.Namespace) -> None: """List user projects.""" logfire_url: str | None = args.logfire_url - user_token = default_token_collection().get_token(logfire_url) - client = LogfireClient(user_token) + client = LogfireClient.from_url(logfire_url) projects = client.get_user_projects() if projects: @@ -303,8 +302,7 @@ def parse_create_new_project(args: argparse.Namespace) -> None: """Create a new project.""" data_dir = Path(args.data_dir) logfire_url: str | None = args.logfire_url - user_token = default_token_collection().get_token(logfire_url) - client = LogfireClient(user_token) + client = LogfireClient.from_url(logfire_url) project_name = args.project_name organization = args.org @@ -315,7 +313,7 @@ def parse_create_new_project(args: argparse.Namespace) -> None: default_organization=default_organization, project_name=project_name, ) - credentials = _write_credentials(project_info, data_dir, user_token.base_url) + credentials = _write_credentials(project_info, data_dir, client.base_url) sys.stderr.write(f'Project created successfully. You will be able to view it at: {credentials.project_url}\n') @@ -323,8 +321,7 @@ def parse_use_project(args: argparse.Namespace) -> None: """Use an existing project.""" data_dir = Path(args.data_dir) logfire_url: str | None = args.logfire_url - user_token = default_token_collection().get_token(logfire_url) - client = LogfireClient(user_token) + client = LogfireClient.from_url(logfire_url) project_name = args.project_name organization = args.org diff --git a/logfire/_internal/client.py b/logfire/_internal/client.py index c8317075..5e170ed6 100644 --- a/logfire/_internal/client.py +++ b/logfire/_internal/client.py @@ -1,19 +1,17 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any +from typing import Any from urllib.parse import urljoin from requests import Response, Session +from typing_extensions import Self from logfire.exceptions import LogfireConfigError from logfire.version import VERSION +from .auth import UserToken, UserTokenCollection, default_token_collection from .utils import UnexpectedResponse -if TYPE_CHECKING: - from .auth import UserToken - - UA_HEADER = f'logfire/{VERSION}' @@ -27,6 +25,12 @@ def __init__(self, reason: str, /) -> None: class LogfireClient: + """A Logfire HTTP client to interact with the API. + + Args: + user_token: The user token to use when authenticating against the API. + """ + def __init__(self, user_token: UserToken) -> None: if user_token.is_expired: raise RuntimeError @@ -35,6 +39,20 @@ def __init__(self, user_token: UserToken) -> None: self._session = Session() self._session.headers.update({'Authorization': self._token, 'User-Agent': UA_HEADER}) + @classmethod + def from_url(cls, base_url: str | None, token_collection: UserTokenCollection | None = None) -> Self: + """Create a client from the provided base URL. + + Args: + base_url: The base URL to use when looking for a user token. If `None`, will prompt + the user into selecting a token from the token collection (or, if only one available, + use it directly). + token_collection: The token collection to use when looking for the user token. Defaults + to the default token collection from `~/.logfire/default.toml`. + """ + token_collection = token_collection or default_token_collection() + return cls(user_token=token_collection.get_token(base_url)) + def _get(self, endpoint: str) -> Response: response = self._session.get(urljoin(self.base_url, endpoint)) UnexpectedResponse.raise_for_status(response) diff --git a/logfire/_internal/config.py b/logfire/_internal/config.py index a0eed307..f89ed289 100644 --- a/logfire/_internal/config.py +++ b/logfire/_internal/config.py @@ -65,7 +65,6 @@ from logfire.version import VERSION from ..propagate import NoExtractTraceContextPropagator, WarnOnExtractTraceContextPropagator -from .auth import default_token_collection from .client import InvalidProjectName, LogfireClient, ProjectAlreadyExists from .config_params import ParamManager, PydanticPluginRecordValues from .constants import ( @@ -897,8 +896,7 @@ def add_span_processor(span_processor: SpanProcessor) -> None: # if we still don't have a token, try initializing a new project and writing a new creds file # note, we only do this if `send_to_logfire` is explicitly `True`, not 'if-token-present' if self.send_to_logfire is True and credentials is None: - user_token = default_token_collection().get_token(self.advanced.base_url) - client = LogfireClient(user_token) + client = LogfireClient.from_url(self.advanced.base_url) credentials = LogfireCredentials.initialize_project(client=client) credentials.write_creds_file(self.data_dir) @@ -1596,23 +1594,6 @@ def _get_creds_file(creds_dir: Path) -> Path: return creds_dir / CREDENTIALS_FILENAME -def _get_token_repr(url: str, token: str) -> str: - region = 'us' - if match := PYDANTIC_LOGFIRE_TOKEN_PATTERN.match(token): - region = match.group('region') - if region not in REGIONS: - region = 'us' - - token_repr = f'{region.upper()} ({url}) - ' - if match: - # new_token, include prefix and 5 chars - token_repr += match.group('safe_part') + match.group('token')[:5] - else: - token_repr += token[:5] - token_repr += '****' - return token_repr - - def get_base_url_from_token(token: str) -> str: """Get the base API URL from the token's region.""" # default to US for tokens that were created before regions were added: