diff --git a/airflow/providers_manager.py b/airflow/providers_manager.py index 9b39439384f56..d48e75f752392 100644 --- a/airflow/providers_manager.py +++ b/airflow/providers_manager.py @@ -1011,7 +1011,7 @@ def _import_hook( :param package_name: provider package - only needed in case connection_type is missing : return """ - from wtforms import BooleanField, IntegerField, PasswordField, StringField + from wtforms import BooleanField, IntegerField, PasswordField, SelectField, StringField, TextAreaField if connection_type is None and hook_class_name is None: raise ValueError("Either connection_type or hook_class_name must be set") @@ -1031,7 +1031,14 @@ def _import_hook( raise ValueError( f"Provider package name is not set when hook_class_name ({hook_class_name}) is used" ) - allowed_field_classes = [IntegerField, PasswordField, StringField, BooleanField] + allowed_field_classes = [ + IntegerField, + PasswordField, + StringField, + BooleanField, + TextAreaField, + SelectField, + ] hook_class: type[BaseHook] | None = _correctness_check(package_name, hook_class_name, provider_info) if hook_class is None: return None diff --git a/airflow/www/templates/airflow/conn_create.html b/airflow/www/templates/airflow/conn_create.html index ac92b967f7e34..fb3e188949b66 100644 --- a/airflow/www/templates/airflow/conn_create.html +++ b/airflow/www/templates/airflow/conn_create.html @@ -25,7 +25,6 @@ - {# required for codemirror #} diff --git a/docs/apache-airflow-providers-http/configurations-ref.rst b/docs/apache-airflow-providers-http/configurations-ref.rst new file mode 100644 index 0000000000000..5885c9d91b6e8 --- /dev/null +++ b/docs/apache-airflow-providers-http/configurations-ref.rst @@ -0,0 +1,18 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. include:: ../exts/includes/providers-configurations-ref.rst diff --git a/docs/apache-airflow-providers-http/connections/http.rst b/docs/apache-airflow-providers-http/connections/http.rst index 3d471f3ccf8d0..3bc3d951b5b5d 100644 --- a/docs/apache-airflow-providers-http/connections/http.rst +++ b/docs/apache-airflow-providers-http/connections/http.rst @@ -24,35 +24,114 @@ HTTP Connection The HTTP connection enables connections to HTTP services. -Authenticating with HTTP ------------------------- - -Login and Password authentication can be used along with any authentication method using headers. -Headers can be given in json format in the Extras field. - Default Connection IDs ---------------------- The HTTP operators and hooks use ``http_default`` by default. +Authentication +-------------- + + .. _auth_basic: + +Authenticating via Basic auth +............................. +The simplest way to authenticate is to specify a *Login* and *Password* in the +Connection. + +.. image:: /img/connection_username_password.png + +By default, when a *Login* or *Password* is provided, the HTTP operators and +Hooks will perform a basic authentication via the +``requests.auth.HTTPBasicAuth`` class. + +Authenticating via Headers +.......................... +If :ref:`Basic authentication` is not enough, you can also add +*Headers* to the requests performed by the HTTP operators and Hooks. + +Headers can be passed in json format in the *Headers* field: + +.. image:: /img/connection_headers.png + +.. note:: Login and Password authentication can be used along custom Headers. + +Authenticating via Auth class +............................. +For more complex use-cases, you can inject a Auth class into the HTTP operators +and Hooks via the *Auth type* setting. This is particularly useful when you +need token refresh or advanced authentication methods like kerberos, oauth, ... + +.. image:: /img/connection_auth_type.png + +By default, only `requests Auth classes `_ +are available. But you can install any classes based on ``requests.auth.AuthBase`` +into your Airflow instance (via pip install), and then specify those classes in +``extra_auth_types`` :doc:`configuration setting<../configurations-ref>` to +make them available in the Connection UI. + +If the Auth class requires more than a *Username* and a *Password*, you can +pass extra keywords arguments with the *Auth kwargs* setting. + +Example with the ``HTTPKerberosAuth`` from `requests-kerberos `_ : + +.. image:: /img/connection_auth_kwargs.png + +.. tip:: + + You probably don't need to write an entire custom HttpOperator or HttpHook + to customize the connection. Simply extend the ``requests.auth.AuthBase`` + class and configure a Connection with it. + Configuring the Connection -------------------------- +Via the Admin panel +................... + +Configuring the Connection via the Airflow Admin panel offers more +possibilities than via :ref:`environment variables`. + Login (optional) - Specify the login for the http service you would like to connect too. + The login (username) of the http service you would like to connect too. + If provided, by default, the HttpHook perform a Basic authentication. Password (optional) - Specify the password for the http service you would like to connect too. + The password of the http service you would like to connect too. + If provided, by default, the HttpHook perform a Basic authentication. Host (optional) Specify the entire url or the base of the url for the service. Port (optional) - Specify a port number if applicable. + A port number if applicable. Schema (optional) - Specify the service type etc: http/https. + The service type. E.g: http/https. +Auth type (optional) + Python class used by the HttpHook (and the underlying requests library) to + authenticate. If provided, the *Login* and *Password* are passed as the two + first arguments to this class. If *Login* and/or *Password* are provided + without any Auth type, the HttpHook will by default perform a basic + authentication via the ``requests.auth.HTTPBasicAuth`` class. + + Extra classes can be added via the ``extra_auth_types`` + :doc:`configuration setting<../configurations-ref>`. + +Auth kwargs (optional) + Extra key-value parameters passed to the Auth type class. + +Headers (optional) + Extra key-value parameters added to the Headers in JSON format. + +Extras (optional - deprecated) + *Deprecated*: Specify headers in json format. + + .. _env-variable: + +Via environment variable +........................ Extras (optional) Specify headers and default requests parameters in json format. Following default requests parameters are taken into account: @@ -68,10 +147,9 @@ Extras (optional) When specifying the connection in environment variable you should specify it using URI syntax. -Note that all components of the URI should be URL-encoded. - -For example: +.. note:: All components of the URI should be **URL-encoded**. .. code-block:: bash + :caption: Example: - export AIRFLOW_CONN_HTTP_DEFAULT='http://username:password@servvice.com:80/https?headers=header' + export AIRFLOW_CONN_HTTP_DEFAULT='http://username:password@service.com:80/https?headers=header' diff --git a/docs/apache-airflow-providers-http/img/connection_auth_kwargs.png b/docs/apache-airflow-providers-http/img/connection_auth_kwargs.png new file mode 100644 index 0000000000000..7023c3a7a072f Binary files /dev/null and b/docs/apache-airflow-providers-http/img/connection_auth_kwargs.png differ diff --git a/docs/apache-airflow-providers-http/img/connection_auth_type.png b/docs/apache-airflow-providers-http/img/connection_auth_type.png new file mode 100644 index 0000000000000..52eb584e5ccf6 Binary files /dev/null and b/docs/apache-airflow-providers-http/img/connection_auth_type.png differ diff --git a/docs/apache-airflow-providers-http/img/connection_headers.png b/docs/apache-airflow-providers-http/img/connection_headers.png new file mode 100644 index 0000000000000..413e9bbb38864 Binary files /dev/null and b/docs/apache-airflow-providers-http/img/connection_headers.png differ diff --git a/docs/apache-airflow-providers-http/img/connection_username_password.png b/docs/apache-airflow-providers-http/img/connection_username_password.png new file mode 100644 index 0000000000000..6e36e77dd4cb4 Binary files /dev/null and b/docs/apache-airflow-providers-http/img/connection_username_password.png differ diff --git a/docs/apache-airflow-providers-http/index.rst b/docs/apache-airflow-providers-http/index.rst index 49745912f879b..7fc650ea8f50f 100644 --- a/docs/apache-airflow-providers-http/index.rst +++ b/docs/apache-airflow-providers-http/index.rst @@ -42,6 +42,7 @@ :maxdepth: 1 :caption: References + Configuration Python API <_api/airflow/providers/http/index> .. toctree:: diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 2b35ed5bc2dd9..7aae2582f24b1 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -25,6 +25,7 @@ afterall AgentKey aio aiobotocore +aiohttp AioSession aiplatform Airbnb diff --git a/providers/src/airflow/providers/apache/livy/hooks/livy.py b/providers/src/airflow/providers/apache/livy/hooks/livy.py index 3eec9599457fe..611634e85649c 100644 --- a/providers/src/airflow/providers/apache/livy/hooks/livy.py +++ b/providers/src/airflow/providers/apache/livy/hooks/livy.py @@ -81,6 +81,14 @@ class LivyHook(HttpHook, LoggingMixin): conn_type = "livy" hook_name = "Apache Livy" + @classmethod + def get_connection_form_widgets(cls) -> dict[str, Any]: + return super().get_connection_form_widgets() + + @classmethod + def get_ui_field_behaviour(cls) -> dict[str, Any]: + return super().get_ui_field_behaviour() + def __init__( self, livy_conn_id: str = default_conn_name, @@ -88,11 +96,9 @@ def __init__( extra_headers: dict[str, Any] | None = None, auth_type: Any | None = None, ) -> None: - super().__init__(http_conn_id=livy_conn_id) + super().__init__(http_conn_id=livy_conn_id, auth_type=auth_type) self.extra_headers = extra_headers or {} self.extra_options = extra_options or {} - if auth_type: - self.auth_type = auth_type def get_conn(self, headers: dict[str, Any] | None = None) -> Any: """ diff --git a/providers/src/airflow/providers/http/hooks/http.py b/providers/src/airflow/providers/http/hooks/http.py index b22a01f8283db..31e5fa7f3c22d 100644 --- a/providers/src/airflow/providers/http/hooks/http.py +++ b/providers/src/airflow/providers/http/hooks/http.py @@ -17,6 +17,11 @@ # under the License. from __future__ import annotations +import json +import warnings +from contextlib import suppress +from functools import cache +from json import JSONDecodeError from typing import TYPE_CHECKING, Any, Callable from urllib.parse import urlparse @@ -25,21 +30,33 @@ import tenacity from aiohttp import ClientResponseError from asgiref.sync import sync_to_async -from requests.auth import HTTPBasicAuth from requests.models import DEFAULT_REDIRECT_LIMIT from requests_toolbelt.adapters.socket_options import TCPKeepAliveAdapter -from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning from airflow.hooks.base import BaseHook from airflow.providers.http.exceptions import HttpErrorException, HttpMethodException +from airflow.utils.module_loading import import_string if TYPE_CHECKING: from aiohttp.client_reqrep import ClientResponse from requests.adapters import HTTPAdapter + from requests.auth import AuthBase from airflow.models import Connection +DEFAULT_AUTH_TYPES = frozenset( + { + "requests.auth.HTTPBasicAuth", + "requests.auth.HTTPProxyAuth", + "requests.auth.HTTPDigestAuth", + "requests_kerberos.HTTPKerberosAuth", + "aiohttp.BasicAuth", + } +) + + def _url_from_endpoint(base_url: str | None, endpoint: str | None) -> str: """Combine base url with endpoint.""" if base_url and not base_url.endswith("/") and endpoint and not endpoint.startswith("/"): @@ -86,7 +103,7 @@ def __init__( self.method = method.upper() self.base_url: str = "" self._retry_obj: Callable[..., Any] - self._auth_type: Any = auth_type + self.auth_type: Any = auth_type # If no adapter is provided, use TCPKeepAliveAdapter (default behavior) self.adapter = adapter @@ -99,13 +116,68 @@ def __init__( else: self.keep_alive_adapter = None - @property - def auth_type(self): - return self._auth_type or HTTPBasicAuth + @classmethod + @cache + def get_auth_types(cls) -> frozenset[str]: + """ + Get comma-separated extra auth_types from airflow config. + + Those auth_types can then be used in Connection configuration. + """ + from airflow.configuration import conf + + auth_types = DEFAULT_AUTH_TYPES.copy() + extra_auth_types = conf.get("http", "extra_auth_types", fallback=None) + if extra_auth_types: + auth_types |= frozenset({field.strip() for field in extra_auth_types.split(",")}) + return auth_types - @auth_type.setter - def auth_type(self, v): - self._auth_type = v + @classmethod + def get_connection_form_widgets(cls) -> dict[str, Any]: + """Return connection widgets to add to the connection form.""" + from flask_appbuilder.fieldwidgets import BS3TextAreaFieldWidget, BS3TextFieldWidget, Select2Widget + from flask_babel import lazy_gettext + from wtforms.fields import BooleanField, SelectField, StringField, TextAreaField + + default_auth_type: str = "" + auth_types_choices = frozenset({default_auth_type}) | cls.get_auth_types() + + return { + "timeout": StringField(lazy_gettext("Timeout"), widget=BS3TextFieldWidget()), + "allow_redirects": BooleanField(lazy_gettext("Allow redirects"), default=True), + "proxies": TextAreaField(lazy_gettext("Proxies"), widget=BS3TextAreaFieldWidget()), + "stream": BooleanField(lazy_gettext("Stream"), default=False), + "verify": BooleanField(lazy_gettext("Verify"), default=True), + "trust_env": BooleanField(lazy_gettext("Trust env"), default=True), + "cert": StringField(lazy_gettext("Cert"), widget=BS3TextFieldWidget()), + "max_redirects": StringField( + lazy_gettext("Max redirects"), widget=BS3TextFieldWidget(), default=DEFAULT_REDIRECT_LIMIT + ), + "auth_type": SelectField( + lazy_gettext("Auth type"), + choices=[(clazz, clazz) for clazz in auth_types_choices], + widget=Select2Widget(), + default=default_auth_type, + ), + "auth_kwargs": TextAreaField(lazy_gettext("Auth kwargs"), widget=BS3TextAreaFieldWidget()), + "headers": TextAreaField( + lazy_gettext("Headers"), + widget=BS3TextAreaFieldWidget(), + description=( + "Warning: Passing headers parameters directly in 'Extra' field is deprecated, and " + "will be removed in a future version of the Http provider. Use the 'Headers' " + "field instead." + ), + ), + } + + @classmethod + def get_ui_field_behaviour(cls) -> dict[str, Any]: + """Return custom UI field behaviour for Hive Client Wrapper connection.""" + return { + "hidden_fields": ["extra"], + "relabeling": {}, + } # headers may be passed through directly or in the "extra" field in the connection # definition @@ -147,29 +219,89 @@ def _configure_session_from_auth( session.auth = self._extract_auth(connection) return session - def _extract_auth(self, connection: Connection) -> Any | None: - if connection.login: - return self.auth_type(connection.login, connection.password) - elif self._auth_type: - return self.auth_type() + def _load_conn_auth_type(self, module_name: str | None) -> Any: + """ + Load auth_type module from extra Connection parameters. + + Check if the auth_type module is listed in 'extra_auth_types' and load it. + This method protects against the execution of random modules. + """ + if module_name: + if module_name in self.get_auth_types(): + try: + module = import_string(module_name) + self._is_auth_type_setup = True + self.log.info("Loaded auth_type: %s", module_name) + return module + except Exception as error: + self.log.error("Cannot import auth_type '%s' due to: %s", module_name, error) + raise AirflowException(error) + self.log.warning( + "Skipping import of auth_type '%s'. The class should be listed in " + "'extra_auth_types' config of the http provider.", + module_name, + ) + return None + + def _extract_auth(self, connection: Connection) -> AuthBase | None: + extra = connection.extra_dejson + auth_type: Any = self.auth_type or self._load_conn_auth_type(module_name=extra.get("auth_type")) + auth_kwargs = extra.get("auth_kwargs", {}) + + self.log.debug("auth_type: %s", auth_type) + self.log.debug("auth_kwargs: %s", auth_kwargs) + + if auth_type: + auth_args: list[str | None] = [connection.login, connection.password] + + self.log.debug("auth_args: %s", auth_args) + + if any(auth_args): + if auth_kwargs: + _auth = auth_type(*auth_args, **auth_kwargs) + else: + return auth_type(*auth_args) + else: + return auth_type() return None def _configure_session_from_extra( self, session: requests.Session, connection: Connection ) -> requests.Session: + # TODO: once http provider depends on Airflow 2.10.0, use get_extra_dejson(True) instead extra = connection.extra_dejson extra.pop("timeout", None) extra.pop("allow_redirects", None) + extra.pop("auth_type", None) + extra.pop("auth_kwargs", None) + headers = extra.pop("headers", {}) + + # TODO: once http provider depends on Airflow 2.10.0, we can remove this checked section below + if isinstance(headers, str): + with suppress(JSONDecodeError): + headers = json.loads(headers) + session.proxies = extra.pop("proxies", extra.pop("proxy", {})) session.stream = extra.pop("stream", False) session.verify = extra.pop("verify", extra.pop("verify_ssl", True)) session.cert = extra.pop("cert", None) session.max_redirects = extra.pop("max_redirects", DEFAULT_REDIRECT_LIMIT) session.trust_env = extra.pop("trust_env", True) + + if extra: + warnings.warn( + "Passing headers parameters directly in 'Extra' field is deprecated, and " + "will be removed in a future version of the Http provider. Use the 'Headers' " + "field instead.", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) + headers = {**extra, **headers} + try: - session.headers.update(extra) + session.headers.update(headers) except TypeError: - self.log.warning("Connection to %s has invalid extra field.", connection.host) + self.log.warning("Connection to %s has invalid headers field.", connection.host) return session def _configure_session_from_mount_adapters(self, session: requests.Session) -> requests.Session: diff --git a/providers/src/airflow/providers/http/provider.yaml b/providers/src/airflow/providers/http/provider.yaml index 09ab1113fa4ca..0691ec96913bc 100644 --- a/providers/src/airflow/providers/http/provider.yaml +++ b/providers/src/airflow/providers/http/provider.yaml @@ -103,3 +103,17 @@ triggers: connection-types: - hook-class-name: airflow.providers.http.hooks.http.HttpHook connection-type: http + +config: + http: + description: "Options for Http provider." + options: + extra_auth_types: + description: | + A comma separated list of auth_type classes, which can be used to + configure Http Connections in Airflow's UI. This list restricts which + classes can be arbitrary imported to prevent dependency injections. + type: string + version_added: 4.8.0 + example: "requests_kerberos.HTTPKerberosAuth,any.other.custom.HTTPAuth" + default: ~ diff --git a/providers/src/airflow/providers/http/triggers/http.py b/providers/src/airflow/providers/http/triggers/http.py index d25d3a55cfb5b..d30f41990f5b0 100644 --- a/providers/src/airflow/providers/http/triggers/http.py +++ b/providers/src/airflow/providers/http/triggers/http.py @@ -180,7 +180,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]: ) async def run(self) -> AsyncIterator[TriggerEvent]: - """Make a series of asynchronous http calls via an http hook.""" + """Make a series of asynchronous http calls via a http hook.""" hook = self._get_async_hook() while True: try: @@ -193,7 +193,6 @@ async def run(self) -> AsyncIterator[TriggerEvent]: extra_options=self.extra_options, ) yield TriggerEvent(True) - return except AirflowException as exc: if str(exc).startswith("404"): await asyncio.sleep(self.poke_interval) diff --git a/providers/tests/http/hooks/test_http.py b/providers/tests/http/hooks/test_http.py index 82a1ff9765156..0f9d0bb240518 100644 --- a/providers/tests/http/hooks/test_http.py +++ b/providers/tests/http/hooks/test_http.py @@ -22,6 +22,7 @@ import json import logging import os +import warnings from http import HTTPStatus from unittest import mock @@ -34,7 +35,7 @@ from requests.auth import AuthBase, HTTPBasicAuth from requests.models import DEFAULT_REDIRECT_LIMIT -from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning from airflow.models import Connection from airflow.providers.http.hooks.http import HttpAsyncHook, HttpHook @@ -81,12 +82,14 @@ def setup_method(self): self.post_hook = HttpHook(method="POST") def test_raise_for_status_with_200(self, requests_mock): - requests_mock.get( - "http://test:8080/v1/test", status_code=200, text='{"status":{"status": 200}}', reason="OK" - ) - with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection): - resp = self.get_hook.run("v1/test") - assert resp.text == '{"status":{"status": 200}}' + with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning) + requests_mock.get( + "http://test:8080/v1/test", status_code=200, text='{"status":{"status": 200}}', reason="OK" + ) + with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection): + resp = self.get_hook.run("v1/test") + assert resp.text == '{"status":{"status": 200}}' @mock.patch("requests.Request") @mock.patch("requests.Session") @@ -108,100 +111,114 @@ def test_get_request_with_port(self, mock_session, mock_request): mock_request.reset_mock() def test_get_request_do_not_raise_for_status_if_check_response_is_false(self, requests_mock): - requests_mock.get( - "http://test:8080/v1/test", - status_code=404, - text='{"status":{"status": 404}}', - reason="Bad request", - ) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning) + requests_mock.get( + "http://test:8080/v1/test", + status_code=404, + text='{"status":{"status": 404}}', + reason="Bad request", + ) - with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection): - resp = self.get_hook.run("v1/test", extra_options={"check_response": False}) - assert resp.text == '{"status":{"status": 404}}' + with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection): + resp = self.get_hook.run("v1/test", extra_options={"check_response": False}) + assert resp.text == '{"status":{"status": 404}}' def test_hook_contains_header_from_extra_field(self): with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection): - expected_conn = get_airflow_connection() - conn = self.get_hook.get_conn() - assert dict(conn.headers, **json.loads(expected_conn.extra)) == conn.headers - assert conn.headers.get("bearer") == "test" + with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning) + expected_conn = get_airflow_connection() + conn = self.get_hook.get_conn() + assert dict(conn.headers, **json.loads(expected_conn.extra)) == conn.headers + assert conn.headers.get("bearer") == "test" def test_hook_ignore_max_redirects_from_extra_field_as_header(self): airflow_connection = get_airflow_connection_with_extra(extra={"bearer": "test", "max_redirects": 3}) with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=airflow_connection): - expected_conn = airflow_connection() - conn = self.get_hook.get_conn() - assert dict(conn.headers, **json.loads(expected_conn.extra)) != conn.headers - assert conn.headers.get("bearer") == "test" - assert conn.headers.get("allow_redirects") is None - assert conn.proxies == {} - assert conn.stream is False - assert conn.verify is True - assert conn.cert is None - assert conn.max_redirects == 3 - assert conn.trust_env is True + with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning) + expected_conn = airflow_connection() + conn = self.get_hook.get_conn() + assert dict(conn.headers, **json.loads(expected_conn.extra)) != conn.headers + assert conn.headers.get("bearer") == "test" + assert conn.headers.get("allow_redirects") is None + assert conn.proxies == {} + assert conn.stream is False + assert conn.verify is True + assert conn.cert is None + assert conn.max_redirects == 3 + assert conn.trust_env is True def test_hook_ignore_proxies_from_extra_field_as_header(self): airflow_connection = get_airflow_connection_with_extra( extra={"bearer": "test", "proxies": {"http": "http://proxy:80", "https": "https://proxy:80"}} ) with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=airflow_connection): - expected_conn = airflow_connection() - conn = self.get_hook.get_conn() - assert dict(conn.headers, **json.loads(expected_conn.extra)) != conn.headers - assert conn.headers.get("bearer") == "test" - assert conn.headers.get("proxies") is None - assert conn.proxies == {"http": "http://proxy:80", "https": "https://proxy:80"} - assert conn.stream is False - assert conn.verify is True - assert conn.cert is None - assert conn.max_redirects == DEFAULT_REDIRECT_LIMIT - assert conn.trust_env is True + with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning) + expected_conn = airflow_connection() + conn = self.get_hook.get_conn() + assert dict(conn.headers, **json.loads(expected_conn.extra)) != conn.headers + assert conn.headers.get("bearer") == "test" + assert conn.headers.get("proxies") is None + assert conn.proxies == {"http": "http://proxy:80", "https": "https://proxy:80"} + assert conn.stream is False + assert conn.verify is True + assert conn.cert is None + assert conn.max_redirects == DEFAULT_REDIRECT_LIMIT + assert conn.trust_env is True def test_hook_ignore_verify_from_extra_field_as_header(self): airflow_connection = get_airflow_connection_with_extra(extra={"bearer": "test", "verify": False}) with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=airflow_connection): - expected_conn = airflow_connection() - conn = self.get_hook.get_conn() - assert dict(conn.headers, **json.loads(expected_conn.extra)) != conn.headers - assert conn.headers.get("bearer") == "test" - assert conn.headers.get("verify") is None - assert conn.proxies == {} - assert conn.stream is False - assert conn.verify is False - assert conn.cert is None - assert conn.max_redirects == DEFAULT_REDIRECT_LIMIT - assert conn.trust_env is True + with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning) + expected_conn = airflow_connection() + conn = self.get_hook.get_conn() + assert dict(conn.headers, **json.loads(expected_conn.extra)) != conn.headers + assert conn.headers.get("bearer") == "test" + assert conn.headers.get("verify") is None + assert conn.proxies == {} + assert conn.stream is False + assert conn.verify is False + assert conn.cert is None + assert conn.max_redirects == DEFAULT_REDIRECT_LIMIT + assert conn.trust_env is True def test_hook_ignore_cert_from_extra_field_as_header(self): airflow_connection = get_airflow_connection_with_extra(extra={"bearer": "test", "cert": "cert.crt"}) with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=airflow_connection): - expected_conn = airflow_connection() - conn = self.get_hook.get_conn() - assert dict(conn.headers, **json.loads(expected_conn.extra)) != conn.headers - assert conn.headers.get("bearer") == "test" - assert conn.headers.get("cert") is None - assert conn.proxies == {} - assert conn.stream is False - assert conn.verify is True - assert conn.cert == "cert.crt" - assert conn.max_redirects == DEFAULT_REDIRECT_LIMIT - assert conn.trust_env is True + with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning) + expected_conn = airflow_connection() + conn = self.get_hook.get_conn() + assert dict(conn.headers, **json.loads(expected_conn.extra)) != conn.headers + assert conn.headers.get("bearer") == "test" + assert conn.headers.get("cert") is None + assert conn.proxies == {} + assert conn.stream is False + assert conn.verify is True + assert conn.cert == "cert.crt" + assert conn.max_redirects == DEFAULT_REDIRECT_LIMIT + assert conn.trust_env is True def test_hook_ignore_trust_env_from_extra_field_as_header(self): airflow_connection = get_airflow_connection_with_extra(extra={"bearer": "test", "trust_env": False}) with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=airflow_connection): - expected_conn = airflow_connection() - conn = self.get_hook.get_conn() - assert dict(conn.headers, **json.loads(expected_conn.extra)) != conn.headers - assert conn.headers.get("bearer") == "test" - assert conn.headers.get("cert") is None - assert conn.proxies == {} - assert conn.stream is False - assert conn.verify is True - assert conn.cert is None - assert conn.max_redirects == DEFAULT_REDIRECT_LIMIT - assert conn.trust_env is False + with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning) + expected_conn = airflow_connection() + conn = self.get_hook.get_conn() + assert dict(conn.headers, **json.loads(expected_conn.extra)) != conn.headers + assert conn.headers.get("bearer") == "test" + assert conn.headers.get("cert") is None + assert conn.proxies == {} + assert conn.stream is False + assert conn.verify is True + assert conn.cert is None + assert conn.max_redirects == DEFAULT_REDIRECT_LIMIT + assert conn.trust_env is False @mock.patch("requests.Request") def test_hook_with_method_in_lowercase(self, mock_requests): @@ -227,8 +244,10 @@ def test_hook_has_no_header_from_extra(self): def test_hooks_header_from_extra_is_overridden(self): with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection): - conn = self.get_hook.get_conn(headers={"bearer": "newT0k3n"}) - assert conn.headers.get("bearer") == "newT0k3n" + with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning) + conn = self.get_hook.get_conn(headers={"bearer": "newT0k3n"}) + assert conn.headers.get("bearer") == "newT0k3n" def test_post_request(self, requests_mock): requests_mock.post( @@ -236,8 +255,10 @@ def test_post_request(self, requests_mock): ) with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection): - resp = self.post_hook.run("v1/test") - assert resp.status_code == 200 + with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning) + resp = self.post_hook.run("v1/test") + assert resp.status_code == 200 def test_post_request_with_error_code(self, requests_mock): requests_mock.post( @@ -249,7 +270,9 @@ def test_post_request_with_error_code(self, requests_mock): with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection): with pytest.raises(AirflowException): - self.post_hook.run("v1/test") + with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning) + self.post_hook.run("v1/test") def test_post_request_do_not_raise_for_status_if_check_response_is_false(self, requests_mock): requests_mock.post( @@ -260,8 +283,10 @@ def test_post_request_do_not_raise_for_status_if_check_response_is_false(self, r ) with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection): - resp = self.post_hook.run("v1/test", extra_options={"check_response": False}) - assert resp.status_code == 418 + with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning) + resp = self.post_hook.run("v1/test", extra_options={"check_response": False}) + assert resp.status_code == 418 @pytest.mark.db_test @mock.patch("airflow.providers.http.hooks.http.requests.Session") @@ -291,8 +316,10 @@ def test_run_with_advanced_retry(self, requests_mock): reraise=True, ) with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection): - response = self.get_hook.run_with_advanced_retry(endpoint="v1/test", _retry_args=retry_args) - assert isinstance(response, requests.Response) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning) + response = self.get_hook.run_with_advanced_retry(endpoint="v1/test", _retry_args=retry_args) + assert isinstance(response, requests.Response) def test_header_from_extra_and_run_method_are_merged(self): def run_and_return(unused_session, prepped_request, unused_extra_options, **kwargs): @@ -303,10 +330,12 @@ def run_and_return(unused_session, prepped_request, unused_extra_options, **kwar "airflow.providers.http.hooks.http.HttpHook.run_and_check", side_effect=run_and_return ): with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection): - prepared_request = self.get_hook.run("v1/test", headers={"some_other_header": "test"}) - actual = dict(prepared_request.headers) - assert actual.get("bearer") == "test" - assert actual.get("some_other_header") == "test" + with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning) + prepared_request = self.get_hook.run("v1/test", headers={"some_other_header": "test"}) + actual = dict(prepared_request.headers) + assert actual.get("bearer") == "test" + assert actual.get("some_other_header") == "test" @mock.patch("airflow.providers.http.hooks.http.HttpHook.get_connection") def test_http_connection(self, mock_get_connection): @@ -362,8 +391,10 @@ def match_obj1(request): requests_mock.request(method=method, url="//test:8080/v1/test", additional_matcher=match_obj1) with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection): - # will raise NoMockAddress exception if obj1 != request.json() - HttpHook(method=method).run("v1/test", json=obj1) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning) + # will raise NoMockAddress exception if obj1 != request.json() + HttpHook(method=method).run("v1/test", json=obj1) @mock.patch("airflow.providers.http.hooks.http.requests.Session.send") def test_verify_set_to_true_by_default(self, mock_session_send): @@ -438,35 +469,43 @@ def test_verify_false_parameter_overwrites_set_requests_ca_bundle_env_var(self, def test_connection_success(self, requests_mock): requests_mock.get("http://test:8080", status_code=200, json={"status": {"status": 200}}, reason="OK") with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection): - status, msg = self.get_hook.test_connection() - assert status is True - assert msg == "Connection successfully tested" + with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning) + status, msg = self.get_hook.test_connection() + assert status is True + assert msg == "Connection successfully tested" def test_connection_failure(self, requests_mock): requests_mock.get( "http://test:8080", status_code=500, json={"message": "internal server error"}, reason="NOT_OK" ) with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection): - status, msg = self.get_hook.test_connection() - assert status is False - assert msg == "500:NOT_OK" + with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning) + status, msg = self.get_hook.test_connection() + assert status is False + assert msg == "500:NOT_OK" @mock.patch("requests.auth.AuthBase.__init__") def test_loginless_custom_auth_initialized_with_no_args(self, auth): with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection): - auth.return_value = None - hook = HttpHook("GET", "http_default", AuthBase) - hook.get_conn() - auth.assert_called_once_with() + with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning) + auth.return_value = None + hook = HttpHook("GET", "http_default", AuthBase) + hook.get_conn() + auth.assert_called_once_with() @mock.patch("requests.auth.AuthBase.__init__") def test_loginless_custom_auth_initialized_with_args(self, auth): with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection): - auth.return_value = None - auth_with_args = functools.partial(AuthBase, "test_arg") - hook = HttpHook("GET", "http_default", auth_with_args) - hook.get_conn() - auth.assert_called_once_with("test_arg") + with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning) + auth.return_value = None + auth_with_args = functools.partial(AuthBase, "test_arg") + hook = HttpHook("GET", "http_default", auth_with_args) + hook.get_conn() + auth.assert_called_once_with("test_arg") @mock.patch("requests.auth.HTTPBasicAuth.__init__") def test_login_password_basic_auth_initialized(self, auth): @@ -474,18 +513,22 @@ def test_login_password_basic_auth_initialized(self, auth): "airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection_with_login_and_password, ): - auth.return_value = None - hook = HttpHook("GET", "http_default", HTTPBasicAuth) - hook.get_conn() - auth.assert_called_once_with("username", "pass") + with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning) + auth.return_value = None + hook = HttpHook("GET", "http_default", HTTPBasicAuth) + hook.get_conn() + auth.assert_called_once_with("username", "pass") @mock.patch("requests.auth.HTTPBasicAuth.__init__") def test_default_auth_not_initialized(self, auth): with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection): - auth.return_value = None - hook = HttpHook("GET", "http_default") - hook.get_conn() - auth.assert_not_called() + with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning) + auth.return_value = None + hook = HttpHook("GET", "http_default") + hook.get_conn() + auth.assert_not_called() def test_keep_alive_enabled(self): with ( diff --git a/providers/tests/http/sensors/test_http.py b/providers/tests/http/sensors/test_http.py index 78a11e15bb7c1..47af8f49c48cf 100644 --- a/providers/tests/http/sensors/test_http.py +++ b/providers/tests/http/sensors/test_http.py @@ -238,10 +238,14 @@ def resp_check(_): class FakeSession: + """Mock requests.Session object.""" + def __init__(self): self.response = requests.Response() self.response.status_code = 200 self.response._content = "apache/airflow".encode("ascii", "ignore") + self.headers = {} + self.auth = None def send(self, *args, **kwargs): return self.response diff --git a/tests/www/views/test_views_connection.py b/tests/www/views/test_views_connection.py index f9a4efd11c15b..19a36c0ac6b39 100644 --- a/tests/www/views/test_views_connection.py +++ b/tests/www/views/test_views_connection.py @@ -459,8 +459,12 @@ def test_process_form_invalid_extra_removed(admin_client): """ Test that when an invalid json `extra` is passed in the form, it is removed and _not_ saved over the existing extras. + + Note: This can only be tested with a Hook which does not have any custom fields (otherwise + the custom fields override the extra data when editing a Connection). Thus, this is currently + tested with ftp. """ - conn_details = {"conn_id": "test_conn", "conn_type": "http"} + conn_details = {"conn_id": "test_conn", "conn_type": "ftp"} conn = Connection(**conn_details, extra='{"foo": "bar"}') conn.id = 1