Skip to content

Commit

Permalink
[WIP] migrate LoginHandler config to IdentityProvider
Browse files Browse the repository at this point in the history
now that we have a configurable class to represent identity, most of the LoginHandler's custom methods belong there
  • Loading branch information
minrk committed May 4, 2022
1 parent 321f5a9 commit 136dc59
Show file tree
Hide file tree
Showing 4 changed files with 337 additions and 75 deletions.
316 changes: 293 additions & 23 deletions jupyter_server/auth/identity.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,25 @@
.. versionadded:: 2.0
"""
from __future__ import annotations

import binascii
import os
import re
import uuid
from dataclasses import asdict, dataclass
from typing import Any, Optional
from typing import TYPE_CHECKING, Any, Dict, cast

from tornado.web import RequestHandler
from tornado import web
from traitlets import Type, Unicode, default
from traitlets.config import LoggingConfigurable

# from dataclasses import field
from jupyter_server.transutils import _i18n

# circular imports for type checking
if TYPE_CHECKING:
from jupyter_server.base.handlers import JupyterHandler
from jupyter_server.serverapp import ServerApp


@dataclass
Expand All @@ -31,9 +43,9 @@ class User:
display_name: str = ""

# these fields are left as None if undefined
initials: Optional[str] = None
avatar_url: Optional[str] = None
color: Optional[str] = None
initials: str | None = None
avatar_url: str | None = None
color: str | None = None

# TODO: extension fields?
# ext: Dict[str, Dict[str, Any]] = field(default_factory=dict)
Expand All @@ -59,9 +71,6 @@ def fill_defaults(self):
if not self.display_name:
self.display_name = self.name

def to_dict(self):
pass


def _backward_compat_user(got_user: Any) -> User:
"""Backward-compatibility for LoginHandler.get_user
Expand Down Expand Up @@ -93,9 +102,7 @@ def _backward_compat_user(got_user: Any) -> User:

class IdentityProvider(LoggingConfigurable):
"""
Interface for providing identity
_may_ be a coroutine.
Interface for providing identity management and authentication.
Two principle methods:
Expand All @@ -105,27 +112,103 @@ class IdentityProvider(LoggingConfigurable):
The default is to use :py:meth:`dataclasses.asdict`,
and usually shouldn't need override.
Additional methods can customize authentication.
.. versionadded:: 2.0
"""

def get_user(self, handler: RequestHandler) -> User:
cookie_name = Unicode(config=True)

token = Unicode(
"<generated>",
help=_i18n(
"""Token used for authenticating first-time connections to the server.
The token can be read from the file referenced by JUPYTER_TOKEN_FILE or set directly
with the JUPYTER_TOKEN environment variable.
When no password is enabled,
the default is to generate a new, random token.
Setting to an empty string disables authentication altogether, which is NOT RECOMMENDED.
Prior to 2.0: configured as ServerApp.token
"""
),
).tag(config=True)

login_handler_class = Type(
default_value="jupyter_server.auth.login.LoginHandler",
klass=web.RequestHandler,
config=True,
help=_i18n("The login handler class to use."),
)

logout_handler_class = Type(
default_value="jupyter_server.auth.login.LoginHandler",
klass=web.RequestHandler,
config=True,
help=_i18n("The logout handler class to use."),
)

token_generated = False

@default("token")
def _token_default(self):
if os.getenv("JUPYTER_TOKEN"):
self.token_generated = False
return os.environ["JUPYTER_TOKEN"]
if os.getenv("JUPYTER_TOKEN_FILE"):
self.token_generated = False
with open(os.environ["JUPYTER_TOKEN_FILE"]) as token_file:
return token_file.read()
if self.password:
# no token if password is enabled
self.token_generated = False
return ""
else:
self.token_generated = True
return binascii.hexlify(os.urandom(24)).decode("ascii")

def get_user(self, handler: JupyterHandler) -> User | None:
"""Get the authenticated user for a request
Must return a :class:`.jupyter_server.auth.User`,
though it may be a subclass.
Return None if the request is not authenticated.
"""
if handler.login_handler is None:
return User("anonymous")
_may_ be a coroutine
"""
if getattr(handler, "_jupyter_current_user", None):
# already authenticated
return handler._jupyter_current_user
token_user = self.get_user_token(handler)
cookie_user = self.get_user_cookie(handler)
# prefer token to cookie if both given,
# because token is always explicit
user = token_user or cookie_user
if token_user:
# if token-authenticated, persist user_id in cookie
# if it hasn't already been stored there
if user != cookie_user:
self.set_login_cookie(handler, cast(User, user))
# Record that the current request has been authenticated with a token.
# Used in is_token_authenticated above.
handler._token_authenticated = True

if user is None:
# If an invalid cookie was sent, clear it to prevent unnecessary
# extra warnings. But don't do this on a request with *no* cookie,
# because that can erroneously log you out (see gh-3365)
if handler.get_cookie(handler.cookie_name) is not None:
handler.log.warning("Clearing invalid/expired login cookie %s", handler.cookie_name)
handler.clear_login_cookie()
if not self.auth_enabled:
# Completely insecure! No authentication at all.
# No need to warn here, though; validate_security will have already done that.
user = self.generate_anonymous_user(handler)

# The default: call LoginHandler.get_user for backward-compatibility
# TODO: move default implementation to this class,
# deprecate `LoginHandler.get_user`
user = handler.login_handler.get_user(handler)
if user and not isinstance(user, User):
return _backward_compat_user(user)
return user

def identity_model(self, user: User) -> dict:
Expand All @@ -138,4 +221,191 @@ def get_handlers(self) -> list:
For example, an OAuth callback handler.
"""
return []
handlers = []
if self.login_available:
handlers.append((r"/login", self.login_handler_class))
if self.logout_available:
handlers.append((r"/logout", self.logout_handler_class))
return handlers

def user_to_cookie(self, user: User) -> str:
"""Serialize a user to a string for storage in a cookie
If overriding in a subclass, make sure to define user_from_cookie as well.
Default is just the user's username.
"""
# default: username is enough
return user.username

def user_from_cookie(self, cookie_value: str) -> User | None:
"""Inverse of user_to_cookie"""
return User(username=cookie_value)

def set_login_cookie(self, handler: JupyterHandler, user: User) -> None:
"""Call this on handlers to set the login cookie for success"""
cookie_options = handler.settings.get("cookie_options", {})
cookie_options.setdefault("httponly", True)
# tornado <4.2 has a bug that considers secure==True as soon as
# 'secure' kwarg is passed to set_secure_cookie
if handler.settings.get("secure_cookie", handler.request.protocol == "https"):
cookie_options.setdefault("secure", True)
cookie_options.setdefault("path", handler.base_url)
handler.set_secure_cookie(handler.cookie_name, self.user_to_cookie(user), **cookie_options)

auth_header_pat = re.compile(r"(token|bearer)\s+(.+)", re.IGNORECASE)

def get_token(self, handler: JupyterHandler) -> str | None:
"""Get the user token from a request
Default:
- in URL parameters: ?token=<token>
- in header: Authorization: token <token>
"""

user_token = handler.get_argument("token", "")
if not user_token:
# get it from Authorization header
m = self.auth_header_pat.match(handler.request.headers.get("Authorization", ""))
if m:
user_token = m.group(1)
return user_token

def get_user_cookie(self, handler: JupyterHandler) -> User | None:
"""Get user from a cookie
Calls user_from_cookie to deserialize cookie value
"""
get_secure_cookie_kwargs = handler.settings.get("get_secure_cookie_kwargs", {})
user_cookie = handler.get_secure_cookie(handler.cookie_name, **get_secure_cookie_kwargs)
if not user_cookie:
return None
user_cookie = user_cookie.decode()
# TODO: try/catch in case of change in config?
try:
return self.user_from_cookie(user_cookie)
except Exception as e:
# log bad cookie itself, only at debug-level
self.log.debug(f"Error unpacking user from cookie: cookie={user_cookie}", exc_info=True)
self.log.error(f"Error unpacking user from cookie: {e}")
return None

def get_user_token(self, handler: JupyterHandler) -> User | None:
"""Identify the user based on a token in the URL or Authorization header
Returns:
- uuid if authenticated
- None if not
"""
token = handler.token
if not token:
return None
# check login token from URL argument or Authorization header
user_token = self.get_token(handler)
authenticated = False
if user_token == token:
# token-authenticated, set the login cookie
handler.log.debug(
"Accepting token-authenticated connection from %s",
handler.request.remote_ip,
)
authenticated = True

if authenticated:
# token does not correspond to user-id,
# which is stored in a cookie.
# still check the cookie for the user id
user = self.get_user_cookie(handler)
if user is None:
user = self.generate_anonymous_user(handler)
return user
else:
return None

def generate_anonymous_user(self, handler: JupyterHandler) -> User:
"""Generate a random anonymous user"""
# no cookie, generate new random User
user_id = uuid.uuid4().hex
handler.log.info(f"Generating new user for token-authenticated request: {user_id}")
return User(user_id)

def should_check_origin(self, handler: JupyterHandler) -> bool:
"""Should the Handler check for CORS origin validation?
Origin check should be skipped for token-authenticated requests.
Returns:
- True, if Handler must check for valid CORS origin.
- False, if Handler should skip origin check since requests are token-authenticated.
"""
return not self.is_token_authenticated(handler)

def is_token_authenticated(self, handler: JupyterHandler) -> bool:
"""Returns True if handler has been token authenticated. Otherwise, False.
Login with a token is used to signal certain things, such as:
- permit access to REST API
- xsrf protection
- skip origin-checks for scripts
"""
# ensure get_user has been called, so we know if we're token-authenticated
handler.current_user # noqa
return getattr(handler, "_token_authenticated", False)

def validate_security(
self,
app: ServerApp,
ssl_options: dict | None = None,
) -> None:
"""Check the application's security.
Show messages, or abort if necessary, based on the security configuration.
"""
if not app.ip:
warning = "WARNING: The Jupyter server is listening on all IP addresses"
if ssl_options is None:
app.log.warning(f"{warning} and not using encryption. This is not recommended.")
if not self.auth_enabled:
app.log.warning(
f"{warning} and not using authentication. "
"This is highly insecure and not recommended."
)
else:
if not self.auth_enabled:
app.log.warning(
"All authentication is disabled."
" Anyone who can connect to this server will be able to run code."
)

@property
def auth_enabled(self):
"""Is authentication enabled?
Should always be true, but may be False if requests with no auth are allowed.
Previously: LoginHandler.get_login_available
"""
return True

@property
def login_available(self):
"""Whether a LoginHandler is needed - and therefore whether the login page should be displayed."""
return self.auth_enabled


class PasswordIdentityProvider(IdentityProvider):

password = Unicode(help="""Hashed password""", config=True)

@property
def login_available(self):
"""Whether a LoginHandler is needed - and therefore whether the login page should be displayed."""

return bool(self.password or self.token)

@property
def auth_enabled(self):
"""Return whether any auth is enabled"""
return bool(self.password or self.token)
Loading

0 comments on commit 136dc59

Please sign in to comment.