diff --git a/docker/docker-compose-with-oauth-jwt-token.yml b/docker/docker-compose-with-oauth-jwt-token.yml new file mode 100644 index 000000000..b62197241 --- /dev/null +++ b/docker/docker-compose-with-oauth-jwt-token.yml @@ -0,0 +1,93 @@ +services: + # When scaling the opal-server to multiple nodes and/or multiple workers, we use + # a *broadcast* channel to sync between all the instances of opal-server. + # Under the hood, this channel is implemented by encode/broadcaster (see link below). + # At the moment, the broadcast channel can be either: postgresdb, redis or kafka. + # The format of the broadcaster URI string (the one we pass to opal server as `OPAL_BROADCAST_URI`) is specified here: + # https://github.com/encode/broadcaster#available-backends + broadcast_channel: + image: postgres:alpine + environment: + - POSTGRES_DB=postgres + - POSTGRES_USER=postgres + - POSTGRES_PASSWORD=postgres + opal_server: + # by default we run opal-server from latest official image + image: permitio/opal-server:latest + environment: + # the broadcast backbone uri used by opal server workers (see comments above for: broadcast_channel) + - OPAL_BROADCAST_URI=postgres://postgres:postgres@broadcast_channel:5432/postgres + # number of uvicorn workers to run inside the opal-server container + - UVICORN_NUM_WORKERS=4 + # the git repo hosting our policy + # - if this repo is not public, you can pass an ssh key via `OPAL_POLICY_REPO_SSH_KEY`) + # - the repo we pass in this example is *public* and acts as an example repo with dummy rego policy + # - for more info, see: https://docs.opal.ac/tutorials/track_a_git_repo + - OPAL_POLICY_REPO_URL=https://github.com/permitio/opal-example-policy-repo + # in this example we will use a polling interval of 30 seconds to check for new policy updates (git commits affecting the rego policy). + # however, it is better to utilize a git *webhook* to trigger the server to check for changes only when the repo has new commits. + # for more info see: https://docs.opal.ac/tutorials/track_a_git_repo + - OPAL_POLICY_REPO_POLLING_INTERVAL=30 + # configures from where the opal client should initially fetch data (when it first goes up, after disconnection, etc). + # the data sources represents from where the opal clients should get a "complete picture" of the data they need. + # after the initial sources are fetched, the client will subscribe only to update notifications sent by the server. + - OPAL_DATA_CONFIG_SOURCES={"config":{"entries":[{"url":"http://opal_server:7002/policy-data","topics":["policy_data"],"dst_path":"/static"}]}} + - OPAL_LOG_FORMAT_INCLUDE_PID=true + # to protect resources with OAuth2 Opaque token provided by dedicated server + - OPAL_AUTH_TYPE=oauth2 + # URL to generate new OAuth 2.0 Client Credentials Grant token + - OPAL_OAUTH2_TOKEN_URL=https://example/oauth2/token + # JWT validation + - OPAL_OAUTH2_OPENID_CONFIGURATION_URL=https://example/.well-known/openid-configuration + - OPAL_OAUTH2_EXACT_MATCH_CLAIMS=aud=some_audience,iss=some_issuer + - OPAL_OAUTH2_REQUIRED_CLAIMS=sub,iat,exp + - OPAL_OAUTH2_JWT_ALGORITHM=RS256 + - OPAL_OAUTH2_JWT_AUDIENCE=some_audience + - OPAL_OAUTH2_JWT_ISSUER=https://example/issuer + ports: + # exposes opal server on the host machine, you can access the server at: http://localhost:7002 + - "7002:7002" + depends_on: + - broadcast_channel + opal_client: + # by default we run opal-client from latest official image + image: permitio/opal-client:latest + environment: + - OPAL_SERVER_URL=http://opal_server:7002 + - OPAL_LOG_FORMAT_INCLUDE_PID=true + - OPAL_INLINE_OPA_LOG_FORMAT=http + # to protect resources with OAuth2 Opaque token provided by dedicated server + - OPAL_AUTH_TYPE=oauth2 + # client credentials + - OPAL_OAUTH2_CLIENT_ID=some_client_id + - OPAL_OAUTH2_CLIENT_SECRET=some_client_secret + # URL to generate new OAuth 2.0 Client Credentials Grant token + - OPAL_OAUTH2_TOKEN_URL=https://example/oauth2/token + # JWT validation + - OPAL_OAUTH2_OPENID_CONFIGURATION_URL=https://example/.well-known/openid-configuration + - OPAL_OAUTH2_EXACT_MATCH_CLAIMS=aud=some_audience,iss=some_issuer + - OPAL_OAUTH2_REQUIRED_CLAIMS=sub,iat,exp + - OPAL_OAUTH2_JWT_ALGORITHM=RS256 + - OPAL_OAUTH2_JWT_AUDIENCE=some_audience + - OPAL_OAUTH2_JWT_ISSUER=https://example/issuer + # Enable Authorization / Authentication in OPA + - 'OPAL_INLINE_OPA_CONFIG={"authentication":"token", "authorization":"basic", "files": ["authz.rego"]}' + volumes: + # The goal is to create an initial authorization rego that allows OPAL to write the first policy from the POLICY_REPO_URL. + # This is achieved through policy overwrite based on the "id" attribute. + # When the authz.rego file is placed in the root directory of OPA, it is given the id 'authz.rego'. + # Similarly, if there is another authz.rego file in the root of POLICY_REPO_URL, it will also be given the id 'authz.rego'. + # Therefore, if the authz.rego file from the POLICY_REPO_URL exists, it will overwrite the initial authz.rego file. + - ./docker_files/policy_test/authz.rego:/opal/authz.rego + ports: + # exposes opal client on the host machine, you can access the client at: http://localhost:7766 + - "7766:7000" + # exposes the OPA agent (being run by OPAL) on the host machine + # you can access the OPA api that you know and love at: http://localhost:8181 + # OPA api docs are at: https://www.openpolicyagent.org/docs/latest/rest-api/ + - "8181:8181" + depends_on: + - opal_server + # this command is not necessary when deploying OPAL for real, it is simply a trick for dev environments + # to make sure that opal-server is already up before starting the client. + command: sh -c "exec ./wait-for.sh opal_server:7002 --timeout=20 -- ./start.sh" diff --git a/docker/docker-compose-with-oauth-opaque-token.yml b/docker/docker-compose-with-oauth-opaque-token.yml new file mode 100644 index 000000000..7641cd0e8 --- /dev/null +++ b/docker/docker-compose-with-oauth-opaque-token.yml @@ -0,0 +1,83 @@ +services: + # When scaling the opal-server to multiple nodes and/or multiple workers, we use + # a *broadcast* channel to sync between all the instances of opal-server. + # Under the hood, this channel is implemented by encode/broadcaster (see link below). + # At the moment, the broadcast channel can be either: postgresdb, redis or kafka. + # The format of the broadcaster URI string (the one we pass to opal server as `OPAL_BROADCAST_URI`) is specified here: + # https://github.com/encode/broadcaster#available-backends + broadcast_channel: + image: postgres:alpine + environment: + - POSTGRES_DB=postgres + - POSTGRES_USER=postgres + - POSTGRES_PASSWORD=postgres + opal_server: + # by default we run opal-server from latest official image + image: permitio/opal-server:latest + environment: + # the broadcast backbone uri used by opal server workers (see comments above for: broadcast_channel) + - OPAL_BROADCAST_URI=postgres://postgres:postgres@broadcast_channel:5432/postgres + # number of uvicorn workers to run inside the opal-server container + - UVICORN_NUM_WORKERS=4 + # the git repo hosting our policy + # - if this repo is not public, you can pass an ssh key via `OPAL_POLICY_REPO_SSH_KEY`) + # - the repo we pass in this example is *public* and acts as an example repo with dummy rego policy + # - for more info, see: https://docs.opal.ac/tutorials/track_a_git_repo + - OPAL_POLICY_REPO_URL=https://github.com/permitio/opal-example-policy-repo + # in this example we will use a polling interval of 30 seconds to check for new policy updates (git commits affecting the rego policy). + # however, it is better to utilize a git *webhook* to trigger the server to check for changes only when the repo has new commits. + # for more info see: https://docs.opal.ac/tutorials/track_a_git_repo + - OPAL_POLICY_REPO_POLLING_INTERVAL=30 + # configures from where the opal client should initially fetch data (when it first goes up, after disconnection, etc). + # the data sources represents from where the opal clients should get a "complete picture" of the data they need. + # after the initial sources are fetched, the client will subscribe only to update notifications sent by the server. + - OPAL_DATA_CONFIG_SOURCES={"config":{"entries":[{"url":"http://opal_server:7002/policy-data","topics":["policy_data"],"dst_path":"/static"}]}} + - OPAL_LOG_FORMAT_INCLUDE_PID=true + # to protect resources with OAuth2 Opaque token provided by dedicated server + - OPAL_AUTH_TYPE=oauth2 + # URL to generate new OAuth 2.0 Client Credentials Grant token + - OPAL_OAUTH2_TOKEN_URL=https://example/oauth2/token + # introspect URL for Opaque token validation + - OPAL_OAUTH2_INTROSPECT_URL=https://example/oauth2/introspect + ports: + # exposes opal server on the host machine, you can access the server at: http://localhost:7002 + - "7002:7002" + depends_on: + - broadcast_channel + opal_client: + # by default we run opal-client from latest official image + image: permitio/opal-client:latest + environment: + - OPAL_SERVER_URL=http://opal_server:7002 + - OPAL_LOG_FORMAT_INCLUDE_PID=true + - OPAL_INLINE_OPA_LOG_FORMAT=http + # to protect resources with OAuth2 Opaque token provided by dedicated server + - OPAL_AUTH_TYPE=oauth2 + # client credentials + - OPAL_OAUTH2_CLIENT_ID=some_client_id + - OPAL_OAUTH2_CLIENT_SECRET=some_client_secret + # URL to generate new OAuth 2.0 Client Credentials Grant token + - OPAL_OAUTH2_TOKEN_URL=https://example/oauth2/token + # introspect URL for Opaque token validation + - OPAL_OAUTH2_INTROSPECT_URL=https://example/oauth2/introspect + # Enable Authorization / Authentication in OPA + - 'OPAL_INLINE_OPA_CONFIG={"authentication":"token", "authorization":"basic", "files": ["authz.rego"]}' + volumes: + # The goal is to create an initial authorization rego that allows OPAL to write the first policy from the POLICY_REPO_URL. + # This is achieved through policy overwrite based on the "id" attribute. + # When the authz.rego file is placed in the root directory of OPA, it is given the id 'authz.rego'. + # Similarly, if there is another authz.rego file in the root of POLICY_REPO_URL, it will also be given the id 'authz.rego'. + # Therefore, if the authz.rego file from the POLICY_REPO_URL exists, it will overwrite the initial authz.rego file. + - ./docker_files/policy_test/authz.rego:/opal/authz.rego + ports: + # exposes opal client on the host machine, you can access the client at: http://localhost:7766 + - "7766:7000" + # exposes the OPA agent (being run by OPAL) on the host machine + # you can access the OPA api that you know and love at: http://localhost:8181 + # OPA api docs are at: https://www.openpolicyagent.org/docs/latest/rest-api/ + - "8181:8181" + depends_on: + - opal_server + # this command is not necessary when deploying OPAL for real, it is simply a trick for dev environments + # to make sure that opal-server is already up before starting the client. + command: sh -c "exec ./wait-for.sh opal_server:7002 --timeout=20 -- ./start.sh" diff --git a/packages/opal-client/opal_client/callbacks/api.py b/packages/opal-client/opal_client/callbacks/api.py index a2e2d5a63..90b1e6ecd 100644 --- a/packages/opal-client/opal_client/callbacks/api.py +++ b/packages/opal-client/opal_client/callbacks/api.py @@ -3,8 +3,8 @@ from fastapi import APIRouter, Depends, HTTPException, Response, status from opal_client.callbacks.register import CallbacksRegister from opal_client.config import opal_client_config +from opal_common.authentication.authenticator import Authenticator from opal_common.authentication.authz import require_peer_type -from opal_common.authentication.deps import JWTAuthenticator from opal_common.authentication.types import JWTClaims from opal_common.authentication.verifier import Unauthorized from opal_common.logger import logger @@ -13,7 +13,7 @@ from starlette.status import HTTP_500_INTERNAL_SERVER_ERROR -def init_callbacks_api(authenticator: JWTAuthenticator, register: CallbacksRegister): +def init_callbacks_api(authenticator: Authenticator, register: CallbacksRegister): async def require_listener_token(claims: JWTClaims = Depends(authenticator)): try: require_peer_type( diff --git a/packages/opal-client/opal_client/client.py b/packages/opal-client/opal_client/client.py index 7944f65d8..b6d48ec73 100644 --- a/packages/opal-client/opal_client/client.py +++ b/packages/opal-client/opal_client/client.py @@ -2,9 +2,7 @@ import functools import os import signal -import tempfile import uuid -from logging import disable from typing import Awaitable, Callable, List, Literal, Optional, Union import aiofiles @@ -19,8 +17,8 @@ from opal_client.callbacks.register import CallbacksRegister from opal_client.config import PolicyStoreTypes, opal_client_config from opal_client.data.api import init_data_router -from opal_client.data.fetcher import DataFetcher from opal_client.data.updater import DataUpdater +from opal_client.data.updater_factory import DataUpdaterFactory from opal_client.engine.options import CedarServerOptions, OpaServerOptions from opal_client.engine.runner import CedarRunner, OpaRunner from opal_client.limiter import StartupLoadLimiter @@ -31,8 +29,8 @@ from opal_client.policy_store.policy_store_client_factory import ( PolicyStoreClientFactory, ) -from opal_common.authentication.deps import JWTAuthenticator -from opal_common.authentication.verifier import JWTVerifier +from opal_common.authentication.authenticator import Authenticator +from opal_common.authentication.authenticator_factory import AuthenticatorFactory from opal_common.config import opal_common_config from opal_common.logger import configure_logs, logger from opal_common.middleware import configure_middleware @@ -51,7 +49,7 @@ def __init__( inline_opa_options: OpaServerOptions = None, inline_cedar_enabled: bool = None, inline_cedar_options: CedarServerOptions = None, - verifier: Optional[JWTVerifier] = None, + authenticator: Optional[Authenticator] = None, store_backup_path: Optional[str] = None, store_backup_interval: Optional[int] = None, offline_mode_enabled: bool = False, @@ -70,6 +68,10 @@ def __init__( data_updater (DataUpdater, optional): Defaults to None. policy_updater (PolicyUpdater, optional): Defaults to None. """ + if authenticator is not None: + self.authenticator = authenticator + else: + self.authenticator = AuthenticatorFactory.create() self._shard_id = shard_id # defaults policy_store_type: PolicyStoreTypes = ( @@ -127,6 +129,7 @@ def __init__( opal_client_id=opal_client_identifier, on_connect=on_policy_updater_connect, on_disconnect=on_policy_updater_disconnect, + authenticator=self.authenticator, ) else: self.policy_updater = None @@ -142,7 +145,7 @@ def __init__( else opal_client_config.DATA_TOPICS ) - self.data_updater = DataUpdater( + self.data_updater = DataUpdaterFactory.create( policy_store=self.policy_store, data_topics=data_topics, callbacks_register=self._callbacks_register, @@ -150,6 +153,7 @@ def __init__( shard_id=self._shard_id, on_connect=on_data_updater_connect, on_disconnect=on_data_updater_disconnect, + authenticator=self.authenticator, ) else: self.data_updater = None @@ -172,19 +176,6 @@ def __init__( "OPAL client is configured to trust self-signed certificates" ) - if verifier is not None: - self.verifier = verifier - else: - self.verifier = JWTVerifier( - public_key=opal_common_config.AUTH_PUBLIC_KEY, - algorithm=opal_common_config.AUTH_JWT_ALGORITHM, - audience=opal_common_config.AUTH_JWT_AUDIENCE, - issuer=opal_common_config.AUTH_JWT_ISSUER, - ) - if not self.verifier.enabled: - logger.info( - "API authentication disabled (public encryption key was not provided)" - ) self.store_backup_path = ( store_backup_path or opal_client_config.STORE_BACKUP_PATH ) @@ -264,13 +255,13 @@ async def _is_ready(self): def _configure_api_routes(self, app: FastAPI): """Mounts the api routes on the app object.""" - authenticator = JWTAuthenticator(self.verifier) - # Init api routers with required dependencies policy_router = init_policy_router(policy_updater=self.policy_updater) data_router = init_data_router(data_updater=self.data_updater) - policy_store_router = init_policy_store_router(authenticator) - callbacks_router = init_callbacks_api(authenticator, self._callbacks_register) + policy_store_router = init_policy_store_router(self.authenticator) + callbacks_router = init_callbacks_api( + self.authenticator, self._callbacks_register + ) # mount the api routes on the app object app.include_router(policy_router, tags=["Policy Updater"]) diff --git a/packages/opal-client/opal_client/data/oauth2_updater.py b/packages/opal-client/opal_client/data/oauth2_updater.py new file mode 100644 index 000000000..0a9650647 --- /dev/null +++ b/packages/opal-client/opal_client/data/oauth2_updater.py @@ -0,0 +1,46 @@ +from urllib.parse import parse_qs, urlencode, urlparse + +import aiohttp +from aiohttp.client import ClientSession +from opal_client.logger import logger + +from .updater import DefaultDataUpdater + + +class OAuth2DataUpdater(DefaultDataUpdater): + async def _load_policy_data_config( + self, url: str, headers + ) -> aiohttp.ClientResponse: + await self._authenticator.authenticate(headers) + + async with ClientSession(headers=headers) as session: + response = await session.get( + url, **self._ssl_context_kwargs, allow_redirects=False + ) + + if response.status == 307: + return await self._load_redirected_policy_data_config( + response.headers['location'], headers + ) + else: + return response + + async def _load_redirected_policy_data_config(self, url: str, headers): + redirect_url = self.__redirect_url(url) + + logger.info( + "Redirecting to data-sources configuration '{source}'", source=redirect_url + ) + + async with ClientSession(headers=headers) as session: + return await session.get( + redirect_url, **self._ssl_context_kwargs, allow_redirects=False + ) + + def __redirect_url(self, url: str) -> str: + u = urlparse(url) + query = parse_qs(u.query, keep_blank_values=True) + query.pop("token", None) + u = u._replace(query=urlencode(query, True)) + + return u.geturl() diff --git a/packages/opal-client/opal_client/data/updater.py b/packages/opal-client/opal_client/data/updater.py index c99ca3884..f32d5b811 100644 --- a/packages/opal-client/opal_client/data/updater.py +++ b/packages/opal-client/opal_client/data/updater.py @@ -25,6 +25,8 @@ DEFAULT_POLICY_STORE_GETTER, ) from opal_common.async_utils import TakeANumberQueue, TasksPool, repeated_call +from opal_common.authentication.authenticator import Authenticator +from opal_common.authentication.authenticator_factory import AuthenticatorFactory from opal_common.config import opal_common_config from opal_common.fetcher.events import FetcherConfig from opal_common.http_utils import is_http_error_response @@ -42,6 +44,54 @@ class DataUpdater: + async def trigger_data_update(self, update: DataUpdate): + raise NotImplementedError() + + async def get_policy_data_config(self, url: str = None) -> DataSourceConfig: + raise NotImplementedError() + + async def get_base_policy_data( + self, config_url: str = None, data_fetch_reason="Initial load" + ): + raise NotImplementedError() + + async def on_connect(self, client: PubSubClient, channel: RpcChannel): + raise NotImplementedError() + + async def on_disconnect(self, channel: RpcChannel): + raise NotImplementedError() + + async def start(self): + raise NotImplementedError() + + async def stop(self): + raise NotImplementedError() + + async def wait_until_done(self): + raise NotImplementedError() + + @staticmethod + def calc_hash(data): + """Calculate an hash (sah256) on the given data, if data isn't a + string, it will be converted to JSON. + + String are encoded as 'utf-8' prior to hash calculation. + Returns: + the hash of the given data (as a a hexdigit string) or '' on failure to process. + """ + try: + if not isinstance(data, str): + data = json.dumps(data, default=pydantic_encoder) + return hashlib.sha256(data.encode("utf-8")).hexdigest() + except: + logger.exception("Failed to calculate hash for data {data}", data=data) + return "" + + @property + def callbacks_reporter(self) -> CallbacksReporter: + raise NotImplementedError() + +class DefaultDataUpdater(DataUpdater): def __init__( self, token: str = None, @@ -57,6 +107,7 @@ def __init__( shard_id: Optional[str] = None, on_connect: List[PubSubOnConnectCallback] = None, on_disconnect: List[OnDisconnectCallback] = None, + authenticator: Optional[Authenticator] = None, ): """Keeps policy-stores (e.g. OPA) up to date with relevant data Obtains data configuration on startup from OPAL-server Uses Pub/Sub to @@ -137,6 +188,10 @@ def __init__( self._polling_update_tasks = [] self._on_connect_callbacks = on_connect or [] self._on_disconnect_callbacks = on_disconnect or [] + if authenticator is not None: + self._authenticator = authenticator + else: + self._authenticator = AuthenticatorFactory.create() async def __aenter__(self): await self.start() @@ -182,20 +237,32 @@ async def get_policy_data_config(self, url: str = None) -> DataSourceConfig: if url is None: url = self._data_sources_config_url logger.info("Getting data-sources configuration from '{source}'", source=url) + + headers = {} + if self._extra_headers is not None: + headers = self._extra_headers.copy() + headers["Accept"] = "application/json" + try: - async with ClientSession(headers=self._extra_headers) as session: - response = await session.get(url, **self._ssl_context_kwargs) - if response.status == 200: - return DataSourceConfig.parse_obj(await response.json()) - else: - error_details = await response.json() - raise ClientError( - f"Fetch data sources failed with status code {response.status}, error: {error_details}" - ) + response = await self._load_policy_data_config(url, headers) + + if response.status == 200: + return DataSourceConfig.parse_obj(await response.json()) + else: + error_details = await response.text() + raise ClientError( + f"Fetch data sources failed with status code {response.status}, error: {error_details}" + ) except: logger.exception(f"Failed to load data sources config") raise + async def _load_policy_data_config( + self, url: str, headers + ) -> aiohttp.ClientResponse: + async with ClientSession(headers=headers) as session: + return await session.get(url, **self._ssl_context_kwargs) + async def get_base_policy_data( self, config_url: str = None, data_fetch_reason="Initial load" ): @@ -279,13 +346,19 @@ async def _subscriber(self): """Coroutine meant to be spunoff with create_task to listen in the background for data events and pass them to the data_fetcher.""" logger.info("Subscribing to topics: {topics}", topics=self._data_topics) + + headers = {} + if self._extra_headers is not None: + headers = self._extra_headers.copy() + await self._authenticator.authenticate(headers) + self._client = PubSubClient( self._data_topics, self._update_policy_data_callback, methods_class=TenantAwareRpcEventClientMethods, on_connect=[self.on_connect, *self._on_connect_callbacks], on_disconnect=[self.on_disconnect, *self._on_disconnect_callbacks], - extra_headers=self._extra_headers, + extra_headers=headers, keep_alive=opal_client_config.KEEP_ALIVE_INTERVAL, server_uri=self._server_url, **self._ssl_context_kwargs, @@ -344,23 +417,6 @@ async def wait_until_done(self): if self._subscriber_task is not None: await self._subscriber_task - @staticmethod - def calc_hash(data): - """Calculate an hash (sah256) on the given data, if data isn't a - string, it will be converted to JSON. - - String are encoded as 'utf-8' prior to hash calculation. - Returns: - the hash of the given data (as a a hexdigit string) or '' on failure to process. - """ - try: - if not isinstance(data, str): - data = json.dumps(data, default=pydantic_encoder) - return hashlib.sha256(data.encode("utf-8")).hexdigest() - except: - logger.exception("Failed to calculate hash for data {data}", data=data) - return "" - async def _update_policy_data( self, update: DataUpdate, @@ -473,7 +529,9 @@ async def _store_fetched_update(self, update_item): policy_data = result # Create a report on the data-fetching report = DataEntryReport( - entry=entry, hash=self.calc_hash(policy_data), fetched=True + entry=entry, + hash=DataUpdater.calc_hash(policy_data), + fetched=True ) try: diff --git a/packages/opal-client/opal_client/data/updater_factory.py b/packages/opal-client/opal_client/data/updater_factory.py new file mode 100644 index 000000000..ad784f996 --- /dev/null +++ b/packages/opal-client/opal_client/data/updater_factory.py @@ -0,0 +1,70 @@ +from typing import List, Optional + +from fastapi_websocket_pubsub.pub_sub_client import PubSubOnConnectCallback +from fastapi_websocket_rpc.rpc_channel import OnDisconnectCallback +from opal_client.callbacks.register import CallbacksRegister +from opal_client.data.fetcher import DataFetcher +from opal_client.policy_store.base_policy_store_client import BasePolicyStoreClient +from opal_common.authentication.authenticator import Authenticator +from opal_common.config import opal_common_config +from opal_common.logger import logger + +from .oauth2_updater import OAuth2DataUpdater +from .updater import DataUpdater, DefaultDataUpdater + + +class DataUpdaterFactory: + @staticmethod + def create( + token: str = None, + pubsub_url: str = None, + data_sources_config_url: str = None, + fetch_on_connect: bool = True, + data_topics: List[str] = None, + policy_store: BasePolicyStoreClient = None, + should_send_reports=None, + data_fetcher: Optional[DataFetcher] = None, + callbacks_register: Optional[CallbacksRegister] = None, + opal_client_id: str = None, + shard_id: Optional[str] = None, + on_connect: List[PubSubOnConnectCallback] = None, + on_disconnect: List[OnDisconnectCallback] = None, + authenticator: Optional[Authenticator] = None, + ) -> DataUpdater: + if opal_common_config.AUTH_TYPE == "oauth2": + logger.info( + "OPAL is running in secure mode - will authenticate Datasource requests with OAuth2 tokens." + ) + return OAuth2DataUpdater( + token, + pubsub_url, + data_sources_config_url, + fetch_on_connect, + data_topics, + policy_store, + should_send_reports, + data_fetcher, + callbacks_register, + opal_client_id, + shard_id, + on_connect, + on_disconnect, + authenticator, + ) + else: + return DefaultDataUpdater( + token, + pubsub_url, + data_sources_config_url, + fetch_on_connect, + data_topics, + policy_store, + should_send_reports, + data_fetcher, + callbacks_register, + opal_client_id, + shard_id, + on_connect, + on_disconnect, + authenticator, + ) diff --git a/packages/opal-client/opal_client/policy/fetcher.py b/packages/opal-client/opal_client/policy/fetcher.py index b7c8c543f..13e6424cb 100644 --- a/packages/opal-client/opal_client/policy/fetcher.py +++ b/packages/opal-client/opal_client/policy/fetcher.py @@ -4,6 +4,8 @@ from fastapi import HTTPException, status from opal_client.config import opal_client_config from opal_client.logger import logger +from opal_common.authentication.authenticator import Authenticator +from opal_common.authentication.authenticator_factory import AuthenticatorFactory from opal_common.schemas.policy import PolicyBundle from opal_common.security.sslcontext import get_custom_ssl_context from opal_common.utils import ( @@ -28,15 +30,27 @@ def force_valid_bundle(bundle) -> PolicyBundle: class PolicyFetcher: """Fetches policy from backend.""" - def __init__(self, backend_url=None, token=None): + def __init__( + self, + backend_url=None, + token=None, + authenticator: Optional[Authenticator] = None, + ): """ Args: backend_url (str): Defaults to opal_client_config.SERVER_URL. token ([type], optional): [description]. Defaults to opal_client_config.CLIENT_TOKEN. """ + if authenticator is not None: + self._authenticator = authenticator + else: + self._authenticator = AuthenticatorFactory.create() self._token = token or opal_client_config.CLIENT_TOKEN self._backend_url = backend_url or opal_client_config.SERVER_URL - self._auth_headers = tuple_to_dict(get_authorization_header(self._token)) + if self._token is not None: + self._auth_headers = tuple_to_dict(get_authorization_header(self._token)) + else: + self._auth_headers = dict() self._retry_config = ( opal_client_config.POLICY_UPDATER_CONN_RETRY.toTenacityConfig() @@ -82,10 +96,15 @@ async def _fetch_policy_bundle( May throw, in which case we retry again. """ + headers = {} + if self._auth_headers is not None: + headers = self._auth_headers.copy() + await self._authenticator.authenticate(headers) + params = {"path": directories} if base_hash is not None: params["base_hash"] = base_hash - async with aiohttp.ClientSession() as session: + async with aiohttp.ClientSession(headers=headers) as session: logger.info( "Fetching policy bundle from {url}", url=self._policy_endpoint_url, diff --git a/packages/opal-client/opal_client/policy/updater.py b/packages/opal-client/opal_client/policy/updater.py index 998b9b608..34c6a923c 100644 --- a/packages/opal-client/opal_client/policy/updater.py +++ b/packages/opal-client/opal_client/policy/updater.py @@ -17,6 +17,8 @@ DEFAULT_POLICY_STORE_GETTER, ) from opal_common.async_utils import TakeANumberQueue, TasksPool +from opal_common.authentication.authenticator import Authenticator +from opal_common.authentication.authenticator_factory import AuthenticatorFactory from opal_common.config import opal_common_config from opal_common.schemas.data import DataUpdateReport from opal_common.schemas.policy import PolicyBundle, PolicyUpdateMessage @@ -46,6 +48,7 @@ def __init__( opal_client_id: str = None, on_connect: List[PubSubOnConnectCallback] = None, on_disconnect: List[OnDisconnectCallback] = None, + authenticator: Optional[Authenticator] = None, ): """Inits the policy updater. @@ -67,6 +70,10 @@ def __init__( self._opal_client_id = opal_client_id self._scope_id = opal_client_config.SCOPE_ID + if authenticator is not None: + self._authenticator = authenticator + else: + self._authenticator = AuthenticatorFactory.create() # The policy store we'll save policy modules into (i.e: OPA) self._policy_store = policy_store or DEFAULT_POLICY_STORE_GETTER() # pub/sub server url and authentication data @@ -90,7 +97,7 @@ def __init__( self._policy_update_task = None self._stopping = False # policy fetcher - fetches policy bundles - self._policy_fetcher = PolicyFetcher() + self._policy_fetcher = PolicyFetcher(authenticator=self._authenticator) # callbacks on policy changes self._data_fetcher = data_fetcher or DataFetcher() self._callbacks_register = callbacks_register or CallbacksRegister() @@ -245,12 +252,18 @@ async def _subscriber(self): update_policy() callback (which will fetch the relevant policy bundle from the server and update the policy store).""" logger.info("Subscribing to topics: {topics}", topics=self._topics) + + headers = {} + if self._extra_headers is not None: + headers = self._extra_headers.copy() + await self._authenticator.authenticate(headers) + self._client = PubSubClient( topics=self._topics, callback=self._update_policy_callback, on_connect=[self._on_connect, *self._on_connect_callbacks], on_disconnect=[self._on_disconnect, *self._on_disconnect_callbacks], - extra_headers=self._extra_headers, + extra_headers=headers, keep_alive=opal_client_config.KEEP_ALIVE_INTERVAL, server_uri=self._server_url, **self._ssl_context_kwargs, diff --git a/packages/opal-client/opal_client/policy_store/api.py b/packages/opal-client/opal_client/policy_store/api.py index b27d83d70..97113f109 100644 --- a/packages/opal-client/opal_client/policy_store/api.py +++ b/packages/opal-client/opal_client/policy_store/api.py @@ -1,15 +1,15 @@ from fastapi import APIRouter, Depends from opal_client.config import opal_client_config from opal_client.policy_store.schemas import PolicyStoreAuth, PolicyStoreDetails +from opal_common.authentication.authenticator import Authenticator from opal_common.authentication.authz import require_peer_type -from opal_common.authentication.deps import JWTAuthenticator from opal_common.authentication.types import JWTClaims from opal_common.authentication.verifier import Unauthorized from opal_common.logger import logger from opal_common.schemas.security import PeerType -def init_policy_store_router(authenticator: JWTAuthenticator): +def init_policy_store_router(authenticator: Authenticator): router = APIRouter() @router.get( diff --git a/packages/opal-client/opal_client/tests/data_updater_test.py b/packages/opal-client/opal_client/tests/data_updater_test.py index f2b27b0fb..9ddae3010 100644 --- a/packages/opal-client/opal_client/tests/data_updater_test.py +++ b/packages/opal-client/opal_client/tests/data_updater_test.py @@ -21,7 +21,12 @@ from opal_client.config import opal_client_config from opal_client.data.rpc import TenantAwareRpcEventClientMethods -from opal_client.data.updater import DataSourceEntry, DataUpdate, DataUpdater +from opal_client.data.updater import ( + DataSourceEntry, + DataUpdate, + DataUpdater, + DefaultDataUpdater, +) from opal_client.policy_store.policy_store_client_factory import ( PolicyStoreClientFactory, ) @@ -167,7 +172,7 @@ async def test_data_updater(server): server trigger a Data-update and check our policy store gets the update.""" # config to use mock OPA policy_store = PolicyStoreClientFactory.create(store_type=PolicyStoreTypes.MOCK) - updater = DataUpdater( + updater = DefaultDataUpdater( pubsub_url=UPDATES_URL, policy_store=policy_store, fetch_on_connect=False, @@ -233,7 +238,7 @@ async def test_data_updater_with_report_callback(server): server trigger a Data-update and check our policy store gets the update.""" # config to use mock OPA policy_store = PolicyStoreClientFactory.create(store_type=PolicyStoreTypes.MOCK) - updater = DataUpdater( + updater = DefaultDataUpdater( pubsub_url=UPDATES_URL, policy_store=policy_store, fetch_on_connect=False, @@ -293,7 +298,7 @@ async def test_client_get_initial_data(server): """Connect to OPAL-server and make sure data is fetched on-connect.""" # config to use mock OPA policy_store = PolicyStoreClientFactory.create(store_type=PolicyStoreTypes.MOCK) - updater = DataUpdater( + updater = DefaultDataUpdater( pubsub_url=UPDATES_URL, data_sources_config_url=DATA_CONFIG_URL, policy_store=policy_store, diff --git a/packages/opal-client/opal_client/tests/server_to_client_intergation_test.py b/packages/opal-client/opal_client/tests/server_to_client_intergation_test.py index a3372c56f..5ce829fd7 100644 --- a/packages/opal-client/opal_client/tests/server_to_client_intergation_test.py +++ b/packages/opal-client/opal_client/tests/server_to_client_intergation_test.py @@ -18,7 +18,7 @@ from opal_client import OpalClient from opal_client.data.rpc import TenantAwareRpcEventClientMethods -from opal_client.data.updater import DataSourceEntry, DataUpdate, DataUpdater +from opal_client.data.updater import DataSourceEntry, DataUpdate, DefaultDataUpdater from opal_client.policy_store.mock_policy_store_client import MockPolicyStoreClient from opal_client.policy_store.policy_store_client_factory import ( PolicyStoreClientFactory, @@ -76,7 +76,7 @@ async def startup_event(): def setup_client(event): # config to use mock OPA policy_store = PolicyStoreClientFactory.create(store_type=PolicyStoreTypes.MOCK) - data_updater = DataUpdater( + data_updater = DefaultDataUpdater( pubsub_url=UPDATES_URL, data_sources_config_url=DATA_CONFIG_URL, policy_store=policy_store, diff --git a/packages/opal-common/opal_common/authentication/authenticator.py b/packages/opal-common/opal_common/authentication/authenticator.py new file mode 100644 index 000000000..87e210ad4 --- /dev/null +++ b/packages/opal-common/opal_common/authentication/authenticator.py @@ -0,0 +1,15 @@ +from typing import Optional + +from opal_common.authentication.signer import JWTSigner + + +class Authenticator: + @property + def enabled(self) -> bool: + raise NotImplementedError() + + def signer(self) -> Optional[JWTSigner]: + raise NotImplementedError() + + async def authenticate(self, headers): + raise NotImplementedError() diff --git a/packages/opal-common/opal_common/authentication/authenticator_factory.py b/packages/opal-common/opal_common/authentication/authenticator_factory.py new file mode 100644 index 000000000..c0bcc40ad --- /dev/null +++ b/packages/opal-common/opal_common/authentication/authenticator_factory.py @@ -0,0 +1,34 @@ +from opal_common.authentication.authenticator import Authenticator +from opal_common.authentication.deps import JWTAuthenticator +from opal_common.authentication.verifier import JWTVerifier, Unauthorized +from opal_common.config import opal_common_config +from opal_common.logger import logger + +from .oauth2 import CachedOAuth2Authenticator, OAuth2ClientCredentialsAuthenticator + + +class AuthenticatorFactory: + @staticmethod + def create() -> Authenticator: + if opal_common_config.AUTH_TYPE == "oauth2": + logger.info( + "OPAL is running in secure mode - will authenticate API requests with OAuth2 tokens." + ) + return CachedOAuth2Authenticator(OAuth2ClientCredentialsAuthenticator()) + else: + return JWTAuthenticator(AuthenticatorFactory.__verifier()) + + @staticmethod + def __verifier() -> JWTVerifier: + verifier = JWTVerifier( + public_key=opal_common_config.AUTH_PUBLIC_KEY, + algorithm=opal_common_config.AUTH_JWT_ALGORITHM, + audience=opal_common_config.AUTH_JWT_AUDIENCE, + issuer=opal_common_config.AUTH_JWT_ISSUER, + ) + if not verifier.enabled: + logger.info( + "API authentication disabled (public encryption key was not provided)" + ) + + return verifier diff --git a/packages/opal-common/opal_common/authentication/authz.py b/packages/opal-common/opal_common/authentication/authz.py index 742304bf5..822497e64 100644 --- a/packages/opal-common/opal_common/authentication/authz.py +++ b/packages/opal-common/opal_common/authentication/authz.py @@ -1,4 +1,4 @@ -from opal_common.authentication.deps import JWTAuthenticator +from opal_common.authentication.authenticator import Authenticator from opal_common.authentication.types import JWTClaims from opal_common.authentication.verifier import Unauthorized from opal_common.schemas.data import DataUpdate @@ -6,7 +6,7 @@ def require_peer_type( - authenticator: JWTAuthenticator, claims: JWTClaims, required_type: PeerType + authenticator: Authenticator, claims: JWTClaims, required_type: PeerType ): if not authenticator.enabled: return @@ -28,7 +28,7 @@ def require_peer_type( def restrict_optional_topics_to_publish( - authenticator: JWTAuthenticator, claims: JWTClaims, update: DataUpdate + authenticator: Authenticator, claims: JWTClaims, update: DataUpdate ): if not authenticator.enabled: return diff --git a/packages/opal-common/opal_common/authentication/deps.py b/packages/opal-common/opal_common/authentication/deps.py index 2ec63043b..332a37cb7 100644 --- a/packages/opal-common/opal_common/authentication/deps.py +++ b/packages/opal-common/opal_common/authentication/deps.py @@ -4,6 +4,8 @@ from fastapi import Header from fastapi.exceptions import HTTPException from fastapi.security.utils import get_authorization_scheme_param +from opal_common.authentication.authenticator import Authenticator +from opal_common.authentication.signer import JWTSigner from opal_common.authentication.types import JWTClaims from opal_common.authentication.verifier import JWTVerifier, Unauthorized from opal_common.logger import logger @@ -67,7 +69,7 @@ def verify_logged_in(verifier: JWTVerifier, token: Optional[str]) -> JWTClaims: raise -class _JWTAuthenticator: +class _JWTAuthenticator(Authenticator): def __init__(self, verifier: JWTVerifier): self._verifier = verifier @@ -75,10 +77,16 @@ def __init__(self, verifier: JWTVerifier): def verifier(self) -> JWTVerifier: return self._verifier + def signer(self) -> Optional[JWTSigner]: + return self._verifier + @property def enabled(self) -> JWTVerifier: return self._verifier.enabled + async def authenticate(self, headers): + pass + class JWTAuthenticator(_JWTAuthenticator): """Bearer token authentication for http(s) api endpoints. diff --git a/packages/opal-common/opal_common/authentication/jwk.py b/packages/opal-common/opal_common/authentication/jwk.py new file mode 100644 index 000000000..e69e8e75a --- /dev/null +++ b/packages/opal-common/opal_common/authentication/jwk.py @@ -0,0 +1,49 @@ +import httpx + +import jwt +from cachetools import TTLCache +from opal_common.authentication.verifier import Unauthorized + +class JWKManager: + def __init__( + self, openid_configuration_url, jwt_algorithm, cache_maxsize, cache_ttl + ): + self._openid_configuration_url = openid_configuration_url + self._jwt_algorithm = jwt_algorithm + self._cache = TTLCache(maxsize=cache_maxsize, ttl=cache_ttl) + + def public_key(self, token): + header = jwt.get_unverified_header(token) + kid = header["kid"] + + public_key = self._cache.get(kid) + if public_key is None: + public_key = self._fetch_public_key(token) + self._cache[kid] = public_key + + return public_key + + def _fetch_public_key(self, token: str): + try: + return self._jwks_client().get_signing_key_from_jwt(token).key + except Exception: + raise Unauthorized(description="unknown JWT error") + + def _jwks_client(self): + oidc_config = self._openid_configuration() + signing_algorithms = oidc_config["id_token_signing_alg_values_supported"] + if self._jwt_algorithm.name not in signing_algorithms: + raise Unauthorized(description="unknown JWT algorithm") + if "jwks_uri" not in oidc_config: + raise Unauthorized(description="missing 'jwks_uri' property") + return jwt.PyJWKClient(oidc_config["jwks_uri"]) + + def _openid_configuration(self): + response = httpx.get(self._openid_configuration_url) + + if response.status_code != httpx.codes.OK: + raise Unauthorized( + description=f"invalid status code {response.status_code}" + ) + + return response.json() diff --git a/packages/opal-common/opal_common/authentication/oauth2.py b/packages/opal-common/opal_common/authentication/oauth2.py new file mode 100644 index 000000000..1e688d531 --- /dev/null +++ b/packages/opal-common/opal_common/authentication/oauth2.py @@ -0,0 +1,168 @@ +import asyncio +import time +from typing import Optional + +import httpx +from cachetools import TTLCache, cached +from fastapi import Header +from httpx import AsyncClient, BasicAuth +from opal_common.authentication.authenticator import Authenticator +from opal_common.authentication.deps import get_token_from_header +from opal_common.authentication.jwk import JWKManager +from opal_common.authentication.signer import JWTSigner +from opal_common.authentication.verifier import JWTVerifier, Unauthorized +from opal_common.config import opal_common_config + +class _OAuth2Authenticator(Authenticator): + async def authenticate(self, headers): + if "Authorization" not in headers: + token = await self.token() + headers["Authorization"] = f"Bearer {token}" + + +class OAuth2ClientCredentialsAuthenticator(_OAuth2Authenticator): + def __init__(self) -> None: + self._client_id = opal_common_config.OAUTH2_CLIENT_ID + self._client_secret = opal_common_config.OAUTH2_CLIENT_SECRET + self._token_url = opal_common_config.OAUTH2_TOKEN_URL + self._introspect_url = opal_common_config.OAUTH2_INTROSPECT_URL + self._jwt_algorithm = opal_common_config.OAUTH2_JWT_ALGORITHM + self._jwt_audience = opal_common_config.OAUTH2_JWT_AUDIENCE + self._jwt_issuer = opal_common_config.OAUTH2_JWT_ISSUER + self._jwk_manager = JWKManager( + opal_common_config.OAUTH2_OPENID_CONFIGURATION_URL, + opal_common_config.OAUTH2_JWT_ALGORITHM, + opal_common_config.OAUTH2_JWK_CACHE_MAXSIZE, + opal_common_config.OAUTH2_JWK_CACHE_TTL, + ) + + cfg = opal_common_config.OAUTH2_EXACT_MATCH_CLAIMS + if cfg is None: + self._exact_match_claims = {} + else: + self._exact_match_claims = dict(map(lambda x: x.split("="), cfg.split(","))) + + cfg = opal_common_config.OAUTH2_REQUIRED_CLAIMS + if cfg is None: + self._required_claims = [] + else: + self._required_claims = cfg.split(",") + + @property + def enabled(self): + return True + + def signer(self) -> Optional[JWTSigner]: + return None + + async def token(self): + auth = BasicAuth(self._client_id, self._client_secret) + data = {"grant_type": "client_credentials"} + + async with AsyncClient() as client: + response = await client.post(self._token_url, auth=auth, data=data) + return (response.json())["access_token"] + + def __call__(self, authorization: Optional[str] = Header(None)) -> {}: + token = get_token_from_header(authorization) + return self.verify(token) + + def verify(self, token: str) -> {}: + if self._introspect_url is not None: + claims = self._verify_opaque(token) + else: + claims = self._verify_jwt(token) + + self._verify_exact_match_claims(claims) + self._verify_required_claims(claims) + + return claims + + def _verify_opaque(self, token: str) -> {}: + response = httpx.post(self._introspect_url, data={"token": token}) + + if response.status_code != httpx.codes.OK: + raise Unauthorized( + description=f"invalid status code {response.status_code}" + ) + + claims = response.json() + active = claims.get("active", False) + if not active: + raise Unauthorized(description="inactive token") + + return claims or {} + + def _verify_jwt(self, token: str) -> {}: + public_key = self._jwk_manager.public_key(token) + + verifier = JWTVerifier( + public_key=public_key, + algorithm=self._jwt_algorithm, + audience=self._jwt_audience, + issuer=self._jwt_issuer, + ) + claims = verifier.verify(token) + + return claims or {} + + def _verify_exact_match_claims(self, claims): + for key, value in self._exact_match_claims.items(): + if key not in claims: + raise Unauthorized(description=f"missing required '{key}' claim") + elif claims[key] != value: + raise Unauthorized(description=f"invalid '{key}' claim value") + + def _verify_required_claims(self, claims): + for claim in self._required_claims: + if claim not in claims: + raise Unauthorized(description=f"missing required '{claim}' claim") + + +class CachedOAuth2Authenticator(_OAuth2Authenticator): + lock = asyncio.Lock() + + def __init__(self, delegate: OAuth2ClientCredentialsAuthenticator) -> None: + self._token = None + self._exp = None + self._exp_margin = opal_common_config.OAUTH2_EXP_MARGIN + self._delegate = delegate + + @property + def enabled(self): + return self._delegate.enabled + + def signer(self) -> Optional[JWTSigner]: + return self._delegate.signer() + + def _expired(self): + if self._token is None: + return True + + now = int(time.time()) + return now > self._exp - self._exp_margin + + async def token(self): + if not self._expired(): + return self._token + + async with CachedOAuth2Authenticator.lock: + if not self._expired(): + return self._token + + token = await self._delegate.token() + claims = self._delegate.verify(token) + + self._token = token + self._exp = claims["exp"] + + return self._token + + @cached( + cache=TTLCache( + maxsize=opal_common_config.OAUTH2_TOKEN_VERIFY_CACHE_MAXSIZE, + ttl=opal_common_config.OAUTH2_TOKEN_VERIFY_CACHE_TTL + ) + ) + def __call__(self, authorization: Optional[str] = Header(None)) -> {}: + return self._delegate(authorization) diff --git a/packages/opal-common/opal_common/config.py b/packages/opal-common/opal_common/config.py index b4f9f5b8a..ca2b5a1e1 100644 --- a/packages/opal-common/opal_common/config.py +++ b/packages/opal-common/opal_common/config.py @@ -191,6 +191,68 @@ class OpalCommonConfig(Confi): [".rego"], description="List of extensions to serve as policy modules", ) + AUTH_TYPE = confi.str( + "AUTH_TYPE", + None, + description="Authentication type. Available options are oauth2 for validating access token via either OAUTH2_INTROSPECT_URL or OPAL_OAUTH2_OPENID_CONFIGURATION_URL or anything else if you prefer OPAL to do the job.", + ) + OAUTH2_CLIENT_ID = confi.str( + "OAUTH2_CLIENT_ID", None, description="OAuth2 Client ID." + ) + OAUTH2_CLIENT_SECRET = confi.str( + "OAUTH2_CLIENT_SECRET", None, description="OAuth2 Client Secret." + ) + OAUTH2_TOKEN_URL = confi.str( + "OAUTH2_TOKEN_URL", None, description="OAuth2 Token URL." + ) + OAUTH2_INTROSPECT_URL = confi.str( + "OAUTH2_INTROSPECT_URL", None, description="OAuth2 introspect URL." + ) + OAUTH2_OPENID_CONFIGURATION_URL = confi.str( + "OAUTH2_OPENID_CONFIGURATION_URL", + None, + description="OAuth2 OpenID configuration URL.", + ) + OAUTH2_TOKEN_VERIFY_CACHE_MAXSIZE = confi.int( + "OAUTH2_TOKEN_VERIFY_CACHE_MAXSIZE", + 100, + description="OAuth2 token validation cache maxsize.", + ) + OAUTH2_TOKEN_VERIFY_CACHE_TTL = confi.int( + "OAUTH2_TOKEN_VERIFY_CACHE_TTL", + 5 * 60, + description="OAuth2 token validation cache TTL.", + ) + + OAUTH2_EXP_MARGIN = confi.int( + "OAUTH2_EXP_MARGIN", 5 * 60, description="OAuth2 expiration margin." + ) + OAUTH2_EXACT_MATCH_CLAIMS = confi.str( + "OAUTH2_EXACT_MATCH_CLAIMS", None, description="OAuth2 exact match claims." + ) + OAUTH2_REQUIRED_CLAIMS = confi.str( + "OAUTH2_REQUIRED_CLAIMS", + None, + description="Comma separated list of required claims.", + ) + OAUTH2_JWT_ALGORITHM = confi.enum( + "OAUTH2_JWT_ALGORITHM", + JWTAlgorithm, + getattr(JWTAlgorithm, "RS256"), + description="jwt algorithm, possible values: see: https://pyjwt.readthedocs.io/en/stable/algorithms.html", + ) + OAUTH2_JWT_AUDIENCE = confi.str( + "OAUTH2_JWT_AUDIENCE", None, description="OAuth2 required audience" + ) + OAUTH2_JWT_ISSUER = confi.str( + "OAUTH2_JWT_ISSUER", None, description="OAuth2 required issuer" + ) + OAUTH2_JWK_CACHE_MAXSIZE = confi.int( + "OAUTH2_JWK_CACHE_MAXSIZE", 100, description="OAuth2 JWKS cache maxsize." + ) + OAUTH2_JWK_CACHE_TTL = confi.int( + "OAUTH2_JWK_CACHE_TTL", 7 * 24 * 60 * 60, description="OAuth2 JWKS cache TTL." + ) ENABLE_METRICS = confi.bool( "ENABLE_METRICS", False, description="Enable metrics collection" diff --git a/packages/opal-common/opal_common/fetcher/providers/http_fetch_provider.py b/packages/opal-common/opal_common/fetcher/providers/http_fetch_provider.py index fc74223ed..0ca9354c0 100644 --- a/packages/opal-common/opal_common/fetcher/providers/http_fetch_provider.py +++ b/packages/opal-common/opal_common/fetcher/providers/http_fetch_provider.py @@ -1,10 +1,12 @@ """Simple HTTP get data fetcher using requests supports.""" from enum import Enum -from typing import Any, Union, cast +from typing import Any, Optional, Union, cast import httpx from aiohttp import ClientResponse, ClientSession +from opal_common.authentication.authenticator import Authenticator +from opal_common.authentication.authenticator_factory import AuthenticatorFactory from opal_common.config import opal_common_config from opal_common.fetcher.events import FetcherConfig, FetchEvent from opal_common.fetcher.fetch_provider import BaseFetchProvider @@ -52,6 +54,8 @@ class HttpFetchEvent(FetchEvent): class HttpFetchProvider(BaseFetchProvider): + _authenticator: Optional[Authenticator] = None + def __init__(self, event: HttpFetchEvent) -> None: self._event: HttpFetchEvent if event.config is None: @@ -64,6 +68,9 @@ def __init__(self, event: HttpFetchEvent) -> None: if self._custom_ssl_context is not None else {} ) + if HttpFetchProvider._authenticator is None: + HttpFetchProvider._authenticator = AuthenticatorFactory.create() + self._authenticator = HttpFetchProvider._authenticator def parse_event(self, event: FetchEvent) -> HttpFetchEvent: return HttpFetchEvent(**event.dict(exclude={"config"}), config=event.config) @@ -71,7 +78,10 @@ def parse_event(self, event: FetchEvent) -> HttpFetchEvent: async def __aenter__(self): headers = {} if self._event.config.headers is not None: - headers = self._event.config.headers + headers = self._event.config.headers.copy() + + await self._authenticator.authenticate(headers) + if opal_common_config.HTTP_FETCHER_PROVIDER_CLIENT == "httpx": self._session = httpx.AsyncClient(headers=headers) else: diff --git a/packages/opal-server/opal_server/authentication/__init__.py b/packages/opal-server/opal_server/authentication/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/packages/opal-server/opal_server/authentication/authenticator.py b/packages/opal-server/opal_server/authentication/authenticator.py new file mode 100644 index 000000000..def646382 --- /dev/null +++ b/packages/opal-server/opal_server/authentication/authenticator.py @@ -0,0 +1,18 @@ +from typing import Optional + +from fastapi import Header +from fastapi.exceptions import HTTPException +from opal_common.authentication.authenticator import Authenticator +from opal_common.authentication.types import JWTClaims +from opal_common.authentication.verifier import JWTVerifier, Unauthorized + + +class WebsocketServerAuthenticator(Authenticator): + def __init__(self, delegate: Authenticator) -> None: + self._delegate = delegate + + def __call__(self, authorization: Optional[str] = Header(None)) -> JWTClaims: + try: + return self._delegate(authorization) + except (Unauthorized, HTTPException): + return None diff --git a/packages/opal-server/opal_server/authentication/authenticator_factory.py b/packages/opal-server/opal_server/authentication/authenticator_factory.py new file mode 100644 index 000000000..43f4f092f --- /dev/null +++ b/packages/opal-server/opal_server/authentication/authenticator_factory.py @@ -0,0 +1,49 @@ +from opal_common.authentication.authenticator import Authenticator +from opal_common.authentication.deps import JWTAuthenticator +from opal_common.authentication.oauth2 import ( + CachedOAuth2Authenticator, + OAuth2ClientCredentialsAuthenticator, +) +from opal_common.authentication.signer import JWTSigner +from opal_common.config import opal_common_config +from opal_common.logger import logger +from opal_server.config import opal_server_config + +from .authenticator import WebsocketServerAuthenticator + + +class ServerAuthenticatorFactory: + @staticmethod + def create() -> Authenticator: + if opal_common_config.AUTH_TYPE == "oauth2": + logger.info( + "OPAL is running in secure mode - will verify API requests with OAuth2 tokens." + ) + return CachedOAuth2Authenticator(OAuth2ClientCredentialsAuthenticator()) + else: + return JWTAuthenticator(ServerAuthenticatorFactory.__signer()) + + @staticmethod + def __signer() -> JWTSigner: + signer = JWTSigner( + private_key=opal_server_config.AUTH_PRIVATE_KEY, + public_key=opal_common_config.AUTH_PUBLIC_KEY, + algorithm=opal_common_config.AUTH_JWT_ALGORITHM, + audience=opal_common_config.AUTH_JWT_AUDIENCE, + issuer=opal_common_config.AUTH_JWT_ISSUER, + ) + if signer.enabled: + logger.info( + "OPAL is running in secure mode - will verify API requests with JWT tokens." + ) + else: + logger.info( + "OPAL was not provided with JWT encryption keys, cannot verify api requests!" + ) + return signer + + +class WebsocketServerAuthenticatorFactory: + @staticmethod + def create() -> Authenticator: + return WebsocketServerAuthenticator(ServerAuthenticatorFactory.create()) diff --git a/packages/opal-server/opal_server/data/api.py b/packages/opal-server/opal_server/data/api.py index da5d043a9..6047e9c69 100644 --- a/packages/opal-server/opal_server/data/api.py +++ b/packages/opal-server/opal_server/data/api.py @@ -2,11 +2,12 @@ from fastapi import APIRouter, Depends, Header, HTTPException, status from fastapi.responses import RedirectResponse +from opal_common.authentication.authenticator import Authenticator from opal_common.authentication.authz import ( require_peer_type, restrict_optional_topics_to_publish, ) -from opal_common.authentication.deps import JWTAuthenticator, get_token_from_header +from opal_common.authentication.deps import get_token_from_header from opal_common.authentication.types import JWTClaims from opal_common.authentication.verifier import Unauthorized from opal_common.logger import logger @@ -25,7 +26,7 @@ def init_data_updates_router( data_update_publisher: DataUpdatePublisher, data_sources_config: ServerDataSourceConfig, - authenticator: JWTAuthenticator, + authenticator: Authenticator, ): router = APIRouter() diff --git a/packages/opal-server/opal_server/policy/webhook/api.py b/packages/opal-server/opal_server/policy/webhook/api.py index c19595ad2..ef54c81b4 100644 --- a/packages/opal-server/opal_server/policy/webhook/api.py +++ b/packages/opal-server/opal_server/policy/webhook/api.py @@ -3,7 +3,7 @@ from fastapi import APIRouter, Depends, Request, status from fastapi_websocket_pubsub.pub_sub_server import PubSubEndpoint -from opal_common.authentication.deps import JWTAuthenticator +from opal_common.authentication.authenticator import Authenticator from opal_common.logger import logger from opal_common.schemas.webhook import GitWebhookRequestParams from opal_server.config import PolicySourceTypes, opal_server_config @@ -15,7 +15,7 @@ def init_git_webhook_router( - pubsub_endpoint: PubSubEndpoint, authenticator: JWTAuthenticator + pubsub_endpoint: PubSubEndpoint, authenticator: Authenticator ): async def dummy_affected_repo_urls(request: Request) -> List[str]: return [] diff --git a/packages/opal-server/opal_server/pubsub.py b/packages/opal-server/opal_server/pubsub.py index c7a3b875e..a5139db92 100644 --- a/packages/opal-server/opal_server/pubsub.py +++ b/packages/opal-server/opal_server/pubsub.py @@ -21,13 +21,12 @@ WebSocketRpcEventNotifier, ) from fastapi_websocket_rpc import RpcChannel -from opal_common.authentication.deps import WebsocketJWTAuthenticator -from opal_common.authentication.signer import JWTSigner from opal_common.authentication.types import JWTClaims from opal_common.authentication.verifier import Unauthorized from opal_common.confi.confi import load_conf_if_none from opal_common.config import opal_common_config from opal_common.logger import logger +from opal_server.authentication.authenticator import WebsocketServerAuthenticator from opal_server.config import opal_server_config from pydantic import BaseModel from starlette.datastructures import QueryParams @@ -121,7 +120,11 @@ class PubSub: """Wrapper for the Pub/Sub channel used for both policy and data updates.""" - def __init__(self, signer: JWTSigner, broadcaster_uri: str = None): + def __init__( + self, + broadcaster_uri: str = None, + authenticator: Optional[WebsocketServerAuthenticator] = None, + ): """ Args: broadcaster_uri (str, optional): Which server/medium should the PubSub use for broadcasting. Defaults to BROADCAST_URI. @@ -159,7 +162,6 @@ def __init__(self, signer: JWTSigner, broadcaster_uri: str = None): not opal_server_config.BROADCAST_CONN_LOSS_BUGFIX_EXPERIMENT_ENABLED ), ) - authenticator = WebsocketJWTAuthenticator(signer) @self.api_router.get( "/pubsub_client_info", response_model=Dict[str, ClientInfo] diff --git a/packages/opal-server/opal_server/scopes/api.py b/packages/opal-server/opal_server/scopes/api.py index d54b1074c..f94863deb 100644 --- a/packages/opal-server/opal_server/scopes/api.py +++ b/packages/opal-server/opal_server/scopes/api.py @@ -16,12 +16,13 @@ from fastapi_websocket_pubsub import PubSubEndpoint from git import InvalidGitRepositoryError from opal_common.async_utils import run_sync +from opal_common.authentication.authenticator import Authenticator from opal_common.authentication.authz import ( require_peer_type, restrict_optional_topics_to_publish, ) from opal_common.authentication.casting import cast_private_key -from opal_common.authentication.deps import JWTAuthenticator, get_token_from_header +from opal_common.authentication.deps import get_token_from_header from opal_common.authentication.types import EncryptionKeyFormat, JWTClaims from opal_common.authentication.verifier import Unauthorized from opal_common.logger import logger @@ -78,7 +79,7 @@ def verify_private_key_or_throw(scope_in: Scope): def init_scope_router( scopes: ScopeRepository, - authenticator: JWTAuthenticator, + authenticator: Authenticator, pubsub_endpoint: PubSubEndpoint, ): router = APIRouter() diff --git a/packages/opal-server/opal_server/security/jwks.py b/packages/opal-server/opal_server/security/jwks.py index 8f19591b9..e128eb903 100644 --- a/packages/opal-server/opal_server/security/jwks.py +++ b/packages/opal-server/opal_server/security/jwks.py @@ -1,5 +1,6 @@ import json from pathlib import Path +from typing import Optional from fastapi import FastAPI from fastapi.staticfiles import StaticFiles @@ -11,7 +12,7 @@ class JwksStaticEndpoint: def __init__( self, - signer: JWTSigner, + signer: Optional[JWTSigner], jwks_url: str, jwks_static_dir: str, ): @@ -25,7 +26,7 @@ def configure_app(self, app: FastAPI): # get the jwks contents from the signer jwks_contents = {} - if self._signer.enabled: + if self._signer is not None and self._signer.enabled: jwk = json.loads(self._signer.get_jwk()) jwks_contents = {"keys": [jwk]} diff --git a/packages/opal-server/opal_server/server.py b/packages/opal-server/opal_server/server.py index f320484ea..397f28446 100644 --- a/packages/opal-server/opal_server/server.py +++ b/packages/opal-server/opal_server/server.py @@ -8,8 +8,8 @@ from fastapi import Depends, FastAPI from fastapi_websocket_pubsub.event_broadcaster import EventBroadcasterContextManager -from opal_common.authentication.deps import JWTAuthenticator, StaticBearerAuthenticator -from opal_common.authentication.signer import JWTSigner +from opal_common.authentication.authenticator import Authenticator +from opal_common.authentication.deps import StaticBearerAuthenticator from opal_common.confi.confi import load_conf_if_none from opal_common.config import opal_common_config from opal_common.logger import configure_logs, logger @@ -22,6 +22,11 @@ ServerSideTopicPublisher, TopicPublisher, ) +from opal_server.authentication.authenticator import WebsocketServerAuthenticator +from opal_server.authentication.authenticator_factory import ( + ServerAuthenticatorFactory, + WebsocketServerAuthenticatorFactory, +) from opal_server.config import opal_server_config from opal_server.data.api import init_data_updates_router from opal_server.data.data_update_publisher import DataUpdatePublisher @@ -49,7 +54,8 @@ def __init__( init_publisher: bool = None, data_sources_config: Optional[ServerDataSourceConfig] = None, broadcaster_uri: str = None, - signer: Optional[JWTSigner] = None, + authenticator: Optional[Authenticator] = None, + websocketAuthenticator: Optional[WebsocketServerAuthenticator] = None, enable_jwks_endpoint=True, jwks_url: str = None, jwks_static_dir: str = None, @@ -117,33 +123,26 @@ def __init__( self.broadcaster_uri = broadcaster_uri self.master_token = master_token - if signer is not None: - self.signer = signer - else: - self.signer = JWTSigner( - private_key=opal_server_config.AUTH_PRIVATE_KEY, - public_key=opal_common_config.AUTH_PUBLIC_KEY, - algorithm=opal_common_config.AUTH_JWT_ALGORITHM, - audience=opal_common_config.AUTH_JWT_AUDIENCE, - issuer=opal_common_config.AUTH_JWT_ISSUER, - ) - if self.signer.enabled: - logger.info( - "OPAL is running in secure mode - will verify API requests with JWT tokens." - ) + if authenticator is not None: + self.authenticator = authenticator else: - logger.info( - "OPAL was not provided with JWT encryption keys, cannot verify api requests!" - ) + self.authenticator = ServerAuthenticatorFactory.create() if enable_jwks_endpoint: self.jwks_endpoint = JwksStaticEndpoint( - signer=self.signer, jwks_url=jwks_url, jwks_static_dir=jwks_static_dir + signer=self.authenticator.signer(), + jwks_url=jwks_url, + jwks_static_dir=jwks_static_dir, ) else: self.jwks_endpoint = None - self.pubsub = PubSub(signer=self.signer, broadcaster_uri=broadcaster_uri) + _websocketAuthenticator = websocketAuthenticator + if _websocketAuthenticator is None: + _websocketAuthenticator = WebsocketServerAuthenticatorFactory.create() + self.pubsub = PubSub( + broadcaster_uri=broadcaster_uri, authenticator=_websocketAuthenticator + ) self.publisher: Optional[TopicPublisher] = None self.broadcast_keepalive: Optional[PeriodicPublisher] = None @@ -219,19 +218,19 @@ def _configure_monitoring(self): def _configure_api_routes(self, app: FastAPI): """Mounts the api routes on the app object.""" - authenticator = JWTAuthenticator(self.signer) - data_update_publisher: Optional[DataUpdatePublisher] = None if self.publisher is not None: data_update_publisher = DataUpdatePublisher(self.publisher) # Init api routers with required dependencies data_updates_router = init_data_updates_router( - data_update_publisher, self.data_sources_config, authenticator + data_update_publisher, self.data_sources_config, self.authenticator + ) + webhook_router = init_git_webhook_router( + self.pubsub.endpoint, self.authenticator ) - webhook_router = init_git_webhook_router(self.pubsub.endpoint, authenticator) security_router = init_security_router( - self.signer, StaticBearerAuthenticator(self.master_token) + self.authenticator.signer(), StaticBearerAuthenticator(self.master_token) ) statistics_router = init_statistics_router(self.opal_statistics) loadlimit_router = init_loadlimit_router(self.loadlimit_notation) @@ -240,7 +239,7 @@ def _configure_api_routes(self, app: FastAPI): app.include_router( bundles_router, tags=["Bundle Server"], - dependencies=[Depends(authenticator)], + dependencies=[Depends(self.authenticator)], ) app.include_router(data_updates_router, tags=["Data Updates"]) app.include_router(webhook_router, tags=["Github Webhook"]) @@ -249,22 +248,24 @@ def _configure_api_routes(self, app: FastAPI): app.include_router( self.pubsub.api_router, tags=["Pub/Sub"], - dependencies=[Depends(authenticator)], + dependencies=[Depends(self.authenticator)], ) app.include_router( statistics_router, tags=["Server Statistics"], - dependencies=[Depends(authenticator)], + dependencies=[Depends(self.authenticator)], ) app.include_router( loadlimit_router, tags=["Client Load Limiting"], - dependencies=[Depends(authenticator)], + dependencies=[Depends(self.authenticator)], ) if opal_server_config.SCOPES: app.include_router( - init_scope_router(self._scopes, authenticator, self.pubsub.endpoint), + init_scope_router( + self._scopes, self.authenticator, self.pubsub.endpoint + ), tags=["Scopes"], prefix="/scopes", ) diff --git a/packages/requires.txt b/packages/requires.txt index 4a98128ca..69f5974a1 100644 --- a/packages/requires.txt +++ b/packages/requires.txt @@ -13,3 +13,4 @@ fastapi-utils>=0.2.1,<1 setuptools>=70.0.0 # not directly required, pinned by Snyk to avoid a vulnerability anyio>=4.4.0 # not directly required, pinned by Snyk to avoid a vulnerability starlette>=0.40.0 # not directly required, pinned by Snyk to avoid a vulnerability +tls-cert-refresh-period \ No newline at end of file