1
1
from __future__ import annotations
2
2
3
3
import platform
4
+ import re
4
5
import sys
5
6
import warnings
6
7
from dataclasses import dataclass
30
31
"""File used to store user tokens."""
31
32
32
33
34
+ PYDANTIC_LOGFIRE_TOKEN_PATTERN = re .compile (
35
+ r'^(?P<safe_part>pylf_v(?P<version>[0-9]+)_(?P<region>[a-z]+)_)(?P<token>[a-zA-Z0-9]+)$'
36
+ )
37
+
38
+
39
+ class _RegionData (TypedDict ):
40
+ base_url : str
41
+ gcp_region : str
42
+
43
+
44
+ REGIONS : dict [str , _RegionData ] = {
45
+ 'us' : {
46
+ 'base_url' : 'https://logfire-us.pydantic.dev' ,
47
+ 'gcp_region' : 'us-east4' ,
48
+ },
49
+ 'eu' : {
50
+ 'base_url' : 'https://logfire-eu.pydantic.dev' ,
51
+ 'gcp_region' : 'europe-west4' ,
52
+ },
53
+ }
54
+ """The existing Logfire regions."""
55
+
56
+
33
57
class UserTokenData (TypedDict ):
34
58
"""User token data."""
35
59
36
60
token : str
37
61
expiration : str
38
62
39
63
40
- class TokensFileData (TypedDict ):
64
+ class UserTokensFileData (TypedDict ):
41
65
"""Content of the file containing the user tokens."""
42
66
43
67
tokens : dict [str , UserTokenData ]
@@ -87,43 +111,46 @@ def __str__(self) -> str:
87
111
@dataclass
88
112
class UserTokenCollection :
89
113
"""A collection of user tokens."""
90
- user_tokens : list [UserToken ]
114
+
115
+ user_tokens : dict [str , UserToken ]
116
+ """A mapping between base URLs and user tokens."""
91
117
92
118
@classmethod
93
- def from_tokens (cls , tokens : TokensFileData ) -> Self :
94
- return cls (
95
- user_tokens = [
96
- UserToken . from_user_token_data ( url , data ) # pyright: ignore[reportArgumentType], waiting for PEP 728
97
- for url , data in tokens . items ()
98
- ]
99
- )
119
+ def empty (cls ) -> Self :
120
+ """Create an empty user token collection."""
121
+ return cls ( user_tokens = {})
122
+
123
+ @ classmethod
124
+ def from_file_data ( cls , file_data : UserTokensFileData ) -> Self :
125
+ return cls ( user_tokens = { url : UserToken ( base_url = url , ** data ) for url , data in file_data [ 'tokens' ]. items ()} )
100
126
101
127
@classmethod
102
128
def from_tokens_file (cls , file : Path ) -> Self :
103
- return cls .from_tokens (cast (TokensFileData , read_toml_file (file )))
129
+ return cls .from_file_data (cast (UserTokensFileData , read_toml_file (file )))
104
130
105
131
def get_token (self , base_url : str | None = None ) -> UserToken :
132
+ tokens_list = list (self .user_tokens .values ())
106
133
if base_url is not None :
107
- token = next ((t for t in self . user_tokens if t .base_url == base_url ), None )
134
+ token = next ((t for t in tokens_list if t .base_url == base_url ), None )
108
135
if token is None :
109
136
raise LogfireConfigError (
110
137
f'No user token was found matching the { base_url } Logfire URL. '
111
138
'Please run `logfire auth` to authenticate.'
112
139
)
113
140
else :
114
- if len (self . user_tokens ) == 1 :
115
- token = self . user_tokens [0 ]
116
- elif len (self . user_tokens ) >= 2 :
141
+ if len (tokens_list ) == 1 :
142
+ token = tokens_list [0 ]
143
+ elif len (tokens_list ) >= 2 :
117
144
choices_str = '\n ' .join (
118
145
f'{ i } . { token } ({ "expired" if token .is_expired else "valid" } )'
119
- for i , token in enumerate (self . user_tokens , start = 1 )
146
+ for i , token in enumerate (tokens_list , start = 1 )
120
147
)
121
148
int_choice = IntPrompt .ask (
122
149
f'Multiple user tokens found. Please select one:\n { choices_str } \n ' ,
123
- choices = [str (i ) for i in range (1 , len (self . user_tokens ) + 1 )],
150
+ choices = [str (i ) for i in range (1 , len (tokens_list ) + 1 )],
124
151
)
125
- token = self . user_tokens [int_choice - 1 ]
126
- else : # self.user_tokens == []
152
+ token = tokens_list [int_choice - 1 ]
153
+ else : # tokens_list == []
127
154
raise LogfireConfigError ('No user tokens are available. Please run `logfire auth` to authenticate.' )
128
155
129
156
if token .is_expired :
@@ -132,28 +159,21 @@ def get_token(self, base_url: str | None = None) -> UserToken:
132
159
133
160
def is_logged_in (self , base_url : str | None = None ) -> bool :
134
161
if base_url is not None :
135
- tokens = (t for t in self .user_tokens if t .base_url == base_url )
162
+ tokens = (t for t in self .user_tokens . values () if t .base_url == base_url )
136
163
else :
137
- tokens = self .user_tokens
164
+ tokens = self .user_tokens . values ()
138
165
return any (not t .is_expired for t in tokens )
139
166
140
167
def add_token (self , base_url : str , token : UserTokenData ) -> UserToken :
141
- existing_token = next ((t for t in self .user_tokens if t .base_url == base_url ), None )
142
- if existing_token :
143
- token_index = self .user_tokens .index (existing_token )
144
- self .user_tokens .remove (existing_token )
145
- else :
146
- token_index = len (self .user_tokens )
147
-
148
168
user_token = UserToken .from_user_token_data (base_url , token )
149
- self .user_tokens . insert ( token_index , user_token )
169
+ self .user_tokens [ base_url ] = UserToken . from_user_token_data ( base_url , token )
150
170
return user_token
151
171
152
172
def dump (self , path : Path ) -> None :
153
173
# There's no standard library package to write TOML files, so we'll write it manually.
154
174
with path .open ('w' ) as f :
155
- for user_token in self .user_tokens :
156
- f .write (f'[tokens."{ user_token . base_url } "]\n ' )
175
+ for base_url , user_token in self .user_tokens . items () :
176
+ f .write (f'[tokens."{ base_url } "]\n ' )
157
177
f .write (f'token = "{ user_token .token } "\n ' )
158
178
f .write (f'expiration = "{ user_token .expiration } "\n ' )
159
179
0 commit comments