diff --git a/airflow/providers_manager.py b/airflow/providers_manager.py
index 9b39439384f56..d48e75f752392 100644
--- a/airflow/providers_manager.py
+++ b/airflow/providers_manager.py
@@ -1011,7 +1011,7 @@ def _import_hook(
:param package_name: provider package - only needed in case connection_type is missing
: return
"""
- from wtforms import BooleanField, IntegerField, PasswordField, StringField
+ from wtforms import BooleanField, IntegerField, PasswordField, SelectField, StringField, TextAreaField
if connection_type is None and hook_class_name is None:
raise ValueError("Either connection_type or hook_class_name must be set")
@@ -1031,7 +1031,14 @@ def _import_hook(
raise ValueError(
f"Provider package name is not set when hook_class_name ({hook_class_name}) is used"
)
- allowed_field_classes = [IntegerField, PasswordField, StringField, BooleanField]
+ allowed_field_classes = [
+ IntegerField,
+ PasswordField,
+ StringField,
+ BooleanField,
+ TextAreaField,
+ SelectField,
+ ]
hook_class: type[BaseHook] | None = _correctness_check(package_name, hook_class_name, provider_info)
if hook_class is None:
return None
diff --git a/airflow/www/templates/airflow/conn_create.html b/airflow/www/templates/airflow/conn_create.html
index ac92b967f7e34..fb3e188949b66 100644
--- a/airflow/www/templates/airflow/conn_create.html
+++ b/airflow/www/templates/airflow/conn_create.html
@@ -25,7 +25,6 @@
-
{# required for codemirror #}
diff --git a/docs/apache-airflow-providers-http/configurations-ref.rst b/docs/apache-airflow-providers-http/configurations-ref.rst
new file mode 100644
index 0000000000000..5885c9d91b6e8
--- /dev/null
+++ b/docs/apache-airflow-providers-http/configurations-ref.rst
@@ -0,0 +1,18 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+.. include:: ../exts/includes/providers-configurations-ref.rst
diff --git a/docs/apache-airflow-providers-http/connections/http.rst b/docs/apache-airflow-providers-http/connections/http.rst
index 3d471f3ccf8d0..3bc3d951b5b5d 100644
--- a/docs/apache-airflow-providers-http/connections/http.rst
+++ b/docs/apache-airflow-providers-http/connections/http.rst
@@ -24,35 +24,114 @@ HTTP Connection
The HTTP connection enables connections to HTTP services.
-Authenticating with HTTP
-------------------------
-
-Login and Password authentication can be used along with any authentication method using headers.
-Headers can be given in json format in the Extras field.
-
Default Connection IDs
----------------------
The HTTP operators and hooks use ``http_default`` by default.
+Authentication
+--------------
+
+ .. _auth_basic:
+
+Authenticating via Basic auth
+.............................
+The simplest way to authenticate is to specify a *Login* and *Password* in the
+Connection.
+
+.. image:: /img/connection_username_password.png
+
+By default, when a *Login* or *Password* is provided, the HTTP operators and
+Hooks will perform a basic authentication via the
+``requests.auth.HTTPBasicAuth`` class.
+
+Authenticating via Headers
+..........................
+If :ref:`Basic authentication` is not enough, you can also add
+*Headers* to the requests performed by the HTTP operators and Hooks.
+
+Headers can be passed in json format in the *Headers* field:
+
+.. image:: /img/connection_headers.png
+
+.. note:: Login and Password authentication can be used along custom Headers.
+
+Authenticating via Auth class
+.............................
+For more complex use-cases, you can inject a Auth class into the HTTP operators
+and Hooks via the *Auth type* setting. This is particularly useful when you
+need token refresh or advanced authentication methods like kerberos, oauth, ...
+
+.. image:: /img/connection_auth_type.png
+
+By default, only `requests Auth classes `_
+are available. But you can install any classes based on ``requests.auth.AuthBase``
+into your Airflow instance (via pip install), and then specify those classes in
+``extra_auth_types`` :doc:`configuration setting<../configurations-ref>` to
+make them available in the Connection UI.
+
+If the Auth class requires more than a *Username* and a *Password*, you can
+pass extra keywords arguments with the *Auth kwargs* setting.
+
+Example with the ``HTTPKerberosAuth`` from `requests-kerberos `_ :
+
+.. image:: /img/connection_auth_kwargs.png
+
+.. tip::
+
+ You probably don't need to write an entire custom HttpOperator or HttpHook
+ to customize the connection. Simply extend the ``requests.auth.AuthBase``
+ class and configure a Connection with it.
+
Configuring the Connection
--------------------------
+Via the Admin panel
+...................
+
+Configuring the Connection via the Airflow Admin panel offers more
+possibilities than via :ref:`environment variables`.
+
Login (optional)
- Specify the login for the http service you would like to connect too.
+ The login (username) of the http service you would like to connect too.
+ If provided, by default, the HttpHook perform a Basic authentication.
Password (optional)
- Specify the password for the http service you would like to connect too.
+ The password of the http service you would like to connect too.
+ If provided, by default, the HttpHook perform a Basic authentication.
Host (optional)
Specify the entire url or the base of the url for the service.
Port (optional)
- Specify a port number if applicable.
+ A port number if applicable.
Schema (optional)
- Specify the service type etc: http/https.
+ The service type. E.g: http/https.
+Auth type (optional)
+ Python class used by the HttpHook (and the underlying requests library) to
+ authenticate. If provided, the *Login* and *Password* are passed as the two
+ first arguments to this class. If *Login* and/or *Password* are provided
+ without any Auth type, the HttpHook will by default perform a basic
+ authentication via the ``requests.auth.HTTPBasicAuth`` class.
+
+ Extra classes can be added via the ``extra_auth_types``
+ :doc:`configuration setting<../configurations-ref>`.
+
+Auth kwargs (optional)
+ Extra key-value parameters passed to the Auth type class.
+
+Headers (optional)
+ Extra key-value parameters added to the Headers in JSON format.
+
+Extras (optional - deprecated)
+ *Deprecated*: Specify headers in json format.
+
+ .. _env-variable:
+
+Via environment variable
+........................
Extras (optional)
Specify headers and default requests parameters in json format.
Following default requests parameters are taken into account:
@@ -68,10 +147,9 @@ Extras (optional)
When specifying the connection in environment variable you should specify
it using URI syntax.
-Note that all components of the URI should be URL-encoded.
-
-For example:
+.. note:: All components of the URI should be **URL-encoded**.
.. code-block:: bash
+ :caption: Example:
- export AIRFLOW_CONN_HTTP_DEFAULT='http://username:password@servvice.com:80/https?headers=header'
+ export AIRFLOW_CONN_HTTP_DEFAULT='http://username:password@service.com:80/https?headers=header'
diff --git a/docs/apache-airflow-providers-http/img/connection_auth_kwargs.png b/docs/apache-airflow-providers-http/img/connection_auth_kwargs.png
new file mode 100644
index 0000000000000..7023c3a7a072f
Binary files /dev/null and b/docs/apache-airflow-providers-http/img/connection_auth_kwargs.png differ
diff --git a/docs/apache-airflow-providers-http/img/connection_auth_type.png b/docs/apache-airflow-providers-http/img/connection_auth_type.png
new file mode 100644
index 0000000000000..52eb584e5ccf6
Binary files /dev/null and b/docs/apache-airflow-providers-http/img/connection_auth_type.png differ
diff --git a/docs/apache-airflow-providers-http/img/connection_headers.png b/docs/apache-airflow-providers-http/img/connection_headers.png
new file mode 100644
index 0000000000000..413e9bbb38864
Binary files /dev/null and b/docs/apache-airflow-providers-http/img/connection_headers.png differ
diff --git a/docs/apache-airflow-providers-http/img/connection_username_password.png b/docs/apache-airflow-providers-http/img/connection_username_password.png
new file mode 100644
index 0000000000000..6e36e77dd4cb4
Binary files /dev/null and b/docs/apache-airflow-providers-http/img/connection_username_password.png differ
diff --git a/docs/apache-airflow-providers-http/index.rst b/docs/apache-airflow-providers-http/index.rst
index 49745912f879b..7fc650ea8f50f 100644
--- a/docs/apache-airflow-providers-http/index.rst
+++ b/docs/apache-airflow-providers-http/index.rst
@@ -42,6 +42,7 @@
:maxdepth: 1
:caption: References
+ Configuration
Python API <_api/airflow/providers/http/index>
.. toctree::
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 2b35ed5bc2dd9..7aae2582f24b1 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -25,6 +25,7 @@ afterall
AgentKey
aio
aiobotocore
+aiohttp
AioSession
aiplatform
Airbnb
diff --git a/providers/src/airflow/providers/apache/livy/hooks/livy.py b/providers/src/airflow/providers/apache/livy/hooks/livy.py
index 3eec9599457fe..611634e85649c 100644
--- a/providers/src/airflow/providers/apache/livy/hooks/livy.py
+++ b/providers/src/airflow/providers/apache/livy/hooks/livy.py
@@ -81,6 +81,14 @@ class LivyHook(HttpHook, LoggingMixin):
conn_type = "livy"
hook_name = "Apache Livy"
+ @classmethod
+ def get_connection_form_widgets(cls) -> dict[str, Any]:
+ return super().get_connection_form_widgets()
+
+ @classmethod
+ def get_ui_field_behaviour(cls) -> dict[str, Any]:
+ return super().get_ui_field_behaviour()
+
def __init__(
self,
livy_conn_id: str = default_conn_name,
@@ -88,11 +96,9 @@ def __init__(
extra_headers: dict[str, Any] | None = None,
auth_type: Any | None = None,
) -> None:
- super().__init__(http_conn_id=livy_conn_id)
+ super().__init__(http_conn_id=livy_conn_id, auth_type=auth_type)
self.extra_headers = extra_headers or {}
self.extra_options = extra_options or {}
- if auth_type:
- self.auth_type = auth_type
def get_conn(self, headers: dict[str, Any] | None = None) -> Any:
"""
diff --git a/providers/src/airflow/providers/http/hooks/http.py b/providers/src/airflow/providers/http/hooks/http.py
index b22a01f8283db..31e5fa7f3c22d 100644
--- a/providers/src/airflow/providers/http/hooks/http.py
+++ b/providers/src/airflow/providers/http/hooks/http.py
@@ -17,6 +17,11 @@
# under the License.
from __future__ import annotations
+import json
+import warnings
+from contextlib import suppress
+from functools import cache
+from json import JSONDecodeError
from typing import TYPE_CHECKING, Any, Callable
from urllib.parse import urlparse
@@ -25,21 +30,33 @@
import tenacity
from aiohttp import ClientResponseError
from asgiref.sync import sync_to_async
-from requests.auth import HTTPBasicAuth
from requests.models import DEFAULT_REDIRECT_LIMIT
from requests_toolbelt.adapters.socket_options import TCPKeepAliveAdapter
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.hooks.base import BaseHook
from airflow.providers.http.exceptions import HttpErrorException, HttpMethodException
+from airflow.utils.module_loading import import_string
if TYPE_CHECKING:
from aiohttp.client_reqrep import ClientResponse
from requests.adapters import HTTPAdapter
+ from requests.auth import AuthBase
from airflow.models import Connection
+DEFAULT_AUTH_TYPES = frozenset(
+ {
+ "requests.auth.HTTPBasicAuth",
+ "requests.auth.HTTPProxyAuth",
+ "requests.auth.HTTPDigestAuth",
+ "requests_kerberos.HTTPKerberosAuth",
+ "aiohttp.BasicAuth",
+ }
+)
+
+
def _url_from_endpoint(base_url: str | None, endpoint: str | None) -> str:
"""Combine base url with endpoint."""
if base_url and not base_url.endswith("/") and endpoint and not endpoint.startswith("/"):
@@ -86,7 +103,7 @@ def __init__(
self.method = method.upper()
self.base_url: str = ""
self._retry_obj: Callable[..., Any]
- self._auth_type: Any = auth_type
+ self.auth_type: Any = auth_type
# If no adapter is provided, use TCPKeepAliveAdapter (default behavior)
self.adapter = adapter
@@ -99,13 +116,68 @@ def __init__(
else:
self.keep_alive_adapter = None
- @property
- def auth_type(self):
- return self._auth_type or HTTPBasicAuth
+ @classmethod
+ @cache
+ def get_auth_types(cls) -> frozenset[str]:
+ """
+ Get comma-separated extra auth_types from airflow config.
+
+ Those auth_types can then be used in Connection configuration.
+ """
+ from airflow.configuration import conf
+
+ auth_types = DEFAULT_AUTH_TYPES.copy()
+ extra_auth_types = conf.get("http", "extra_auth_types", fallback=None)
+ if extra_auth_types:
+ auth_types |= frozenset({field.strip() for field in extra_auth_types.split(",")})
+ return auth_types
- @auth_type.setter
- def auth_type(self, v):
- self._auth_type = v
+ @classmethod
+ def get_connection_form_widgets(cls) -> dict[str, Any]:
+ """Return connection widgets to add to the connection form."""
+ from flask_appbuilder.fieldwidgets import BS3TextAreaFieldWidget, BS3TextFieldWidget, Select2Widget
+ from flask_babel import lazy_gettext
+ from wtforms.fields import BooleanField, SelectField, StringField, TextAreaField
+
+ default_auth_type: str = ""
+ auth_types_choices = frozenset({default_auth_type}) | cls.get_auth_types()
+
+ return {
+ "timeout": StringField(lazy_gettext("Timeout"), widget=BS3TextFieldWidget()),
+ "allow_redirects": BooleanField(lazy_gettext("Allow redirects"), default=True),
+ "proxies": TextAreaField(lazy_gettext("Proxies"), widget=BS3TextAreaFieldWidget()),
+ "stream": BooleanField(lazy_gettext("Stream"), default=False),
+ "verify": BooleanField(lazy_gettext("Verify"), default=True),
+ "trust_env": BooleanField(lazy_gettext("Trust env"), default=True),
+ "cert": StringField(lazy_gettext("Cert"), widget=BS3TextFieldWidget()),
+ "max_redirects": StringField(
+ lazy_gettext("Max redirects"), widget=BS3TextFieldWidget(), default=DEFAULT_REDIRECT_LIMIT
+ ),
+ "auth_type": SelectField(
+ lazy_gettext("Auth type"),
+ choices=[(clazz, clazz) for clazz in auth_types_choices],
+ widget=Select2Widget(),
+ default=default_auth_type,
+ ),
+ "auth_kwargs": TextAreaField(lazy_gettext("Auth kwargs"), widget=BS3TextAreaFieldWidget()),
+ "headers": TextAreaField(
+ lazy_gettext("Headers"),
+ widget=BS3TextAreaFieldWidget(),
+ description=(
+ "Warning: Passing headers parameters directly in 'Extra' field is deprecated, and "
+ "will be removed in a future version of the Http provider. Use the 'Headers' "
+ "field instead."
+ ),
+ ),
+ }
+
+ @classmethod
+ def get_ui_field_behaviour(cls) -> dict[str, Any]:
+ """Return custom UI field behaviour for Hive Client Wrapper connection."""
+ return {
+ "hidden_fields": ["extra"],
+ "relabeling": {},
+ }
# headers may be passed through directly or in the "extra" field in the connection
# definition
@@ -147,29 +219,89 @@ def _configure_session_from_auth(
session.auth = self._extract_auth(connection)
return session
- def _extract_auth(self, connection: Connection) -> Any | None:
- if connection.login:
- return self.auth_type(connection.login, connection.password)
- elif self._auth_type:
- return self.auth_type()
+ def _load_conn_auth_type(self, module_name: str | None) -> Any:
+ """
+ Load auth_type module from extra Connection parameters.
+
+ Check if the auth_type module is listed in 'extra_auth_types' and load it.
+ This method protects against the execution of random modules.
+ """
+ if module_name:
+ if module_name in self.get_auth_types():
+ try:
+ module = import_string(module_name)
+ self._is_auth_type_setup = True
+ self.log.info("Loaded auth_type: %s", module_name)
+ return module
+ except Exception as error:
+ self.log.error("Cannot import auth_type '%s' due to: %s", module_name, error)
+ raise AirflowException(error)
+ self.log.warning(
+ "Skipping import of auth_type '%s'. The class should be listed in "
+ "'extra_auth_types' config of the http provider.",
+ module_name,
+ )
+ return None
+
+ def _extract_auth(self, connection: Connection) -> AuthBase | None:
+ extra = connection.extra_dejson
+ auth_type: Any = self.auth_type or self._load_conn_auth_type(module_name=extra.get("auth_type"))
+ auth_kwargs = extra.get("auth_kwargs", {})
+
+ self.log.debug("auth_type: %s", auth_type)
+ self.log.debug("auth_kwargs: %s", auth_kwargs)
+
+ if auth_type:
+ auth_args: list[str | None] = [connection.login, connection.password]
+
+ self.log.debug("auth_args: %s", auth_args)
+
+ if any(auth_args):
+ if auth_kwargs:
+ _auth = auth_type(*auth_args, **auth_kwargs)
+ else:
+ return auth_type(*auth_args)
+ else:
+ return auth_type()
return None
def _configure_session_from_extra(
self, session: requests.Session, connection: Connection
) -> requests.Session:
+ # TODO: once http provider depends on Airflow 2.10.0, use get_extra_dejson(True) instead
extra = connection.extra_dejson
extra.pop("timeout", None)
extra.pop("allow_redirects", None)
+ extra.pop("auth_type", None)
+ extra.pop("auth_kwargs", None)
+ headers = extra.pop("headers", {})
+
+ # TODO: once http provider depends on Airflow 2.10.0, we can remove this checked section below
+ if isinstance(headers, str):
+ with suppress(JSONDecodeError):
+ headers = json.loads(headers)
+
session.proxies = extra.pop("proxies", extra.pop("proxy", {}))
session.stream = extra.pop("stream", False)
session.verify = extra.pop("verify", extra.pop("verify_ssl", True))
session.cert = extra.pop("cert", None)
session.max_redirects = extra.pop("max_redirects", DEFAULT_REDIRECT_LIMIT)
session.trust_env = extra.pop("trust_env", True)
+
+ if extra:
+ warnings.warn(
+ "Passing headers parameters directly in 'Extra' field is deprecated, and "
+ "will be removed in a future version of the Http provider. Use the 'Headers' "
+ "field instead.",
+ AirflowProviderDeprecationWarning,
+ stacklevel=2,
+ )
+ headers = {**extra, **headers}
+
try:
- session.headers.update(extra)
+ session.headers.update(headers)
except TypeError:
- self.log.warning("Connection to %s has invalid extra field.", connection.host)
+ self.log.warning("Connection to %s has invalid headers field.", connection.host)
return session
def _configure_session_from_mount_adapters(self, session: requests.Session) -> requests.Session:
diff --git a/providers/src/airflow/providers/http/provider.yaml b/providers/src/airflow/providers/http/provider.yaml
index 09ab1113fa4ca..0691ec96913bc 100644
--- a/providers/src/airflow/providers/http/provider.yaml
+++ b/providers/src/airflow/providers/http/provider.yaml
@@ -103,3 +103,17 @@ triggers:
connection-types:
- hook-class-name: airflow.providers.http.hooks.http.HttpHook
connection-type: http
+
+config:
+ http:
+ description: "Options for Http provider."
+ options:
+ extra_auth_types:
+ description: |
+ A comma separated list of auth_type classes, which can be used to
+ configure Http Connections in Airflow's UI. This list restricts which
+ classes can be arbitrary imported to prevent dependency injections.
+ type: string
+ version_added: 4.8.0
+ example: "requests_kerberos.HTTPKerberosAuth,any.other.custom.HTTPAuth"
+ default: ~
diff --git a/providers/src/airflow/providers/http/triggers/http.py b/providers/src/airflow/providers/http/triggers/http.py
index d25d3a55cfb5b..d30f41990f5b0 100644
--- a/providers/src/airflow/providers/http/triggers/http.py
+++ b/providers/src/airflow/providers/http/triggers/http.py
@@ -180,7 +180,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
)
async def run(self) -> AsyncIterator[TriggerEvent]:
- """Make a series of asynchronous http calls via an http hook."""
+ """Make a series of asynchronous http calls via a http hook."""
hook = self._get_async_hook()
while True:
try:
@@ -193,7 +193,6 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
extra_options=self.extra_options,
)
yield TriggerEvent(True)
- return
except AirflowException as exc:
if str(exc).startswith("404"):
await asyncio.sleep(self.poke_interval)
diff --git a/providers/tests/http/hooks/test_http.py b/providers/tests/http/hooks/test_http.py
index 82a1ff9765156..0f9d0bb240518 100644
--- a/providers/tests/http/hooks/test_http.py
+++ b/providers/tests/http/hooks/test_http.py
@@ -22,6 +22,7 @@
import json
import logging
import os
+import warnings
from http import HTTPStatus
from unittest import mock
@@ -34,7 +35,7 @@
from requests.auth import AuthBase, HTTPBasicAuth
from requests.models import DEFAULT_REDIRECT_LIMIT
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.models import Connection
from airflow.providers.http.hooks.http import HttpAsyncHook, HttpHook
@@ -81,12 +82,14 @@ def setup_method(self):
self.post_hook = HttpHook(method="POST")
def test_raise_for_status_with_200(self, requests_mock):
- requests_mock.get(
- "http://test:8080/v1/test", status_code=200, text='{"status":{"status": 200}}', reason="OK"
- )
- with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection):
- resp = self.get_hook.run("v1/test")
- assert resp.text == '{"status":{"status": 200}}'
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning)
+ requests_mock.get(
+ "http://test:8080/v1/test", status_code=200, text='{"status":{"status": 200}}', reason="OK"
+ )
+ with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection):
+ resp = self.get_hook.run("v1/test")
+ assert resp.text == '{"status":{"status": 200}}'
@mock.patch("requests.Request")
@mock.patch("requests.Session")
@@ -108,100 +111,114 @@ def test_get_request_with_port(self, mock_session, mock_request):
mock_request.reset_mock()
def test_get_request_do_not_raise_for_status_if_check_response_is_false(self, requests_mock):
- requests_mock.get(
- "http://test:8080/v1/test",
- status_code=404,
- text='{"status":{"status": 404}}',
- reason="Bad request",
- )
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning)
+ requests_mock.get(
+ "http://test:8080/v1/test",
+ status_code=404,
+ text='{"status":{"status": 404}}',
+ reason="Bad request",
+ )
- with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection):
- resp = self.get_hook.run("v1/test", extra_options={"check_response": False})
- assert resp.text == '{"status":{"status": 404}}'
+ with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection):
+ resp = self.get_hook.run("v1/test", extra_options={"check_response": False})
+ assert resp.text == '{"status":{"status": 404}}'
def test_hook_contains_header_from_extra_field(self):
with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection):
- expected_conn = get_airflow_connection()
- conn = self.get_hook.get_conn()
- assert dict(conn.headers, **json.loads(expected_conn.extra)) == conn.headers
- assert conn.headers.get("bearer") == "test"
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning)
+ expected_conn = get_airflow_connection()
+ conn = self.get_hook.get_conn()
+ assert dict(conn.headers, **json.loads(expected_conn.extra)) == conn.headers
+ assert conn.headers.get("bearer") == "test"
def test_hook_ignore_max_redirects_from_extra_field_as_header(self):
airflow_connection = get_airflow_connection_with_extra(extra={"bearer": "test", "max_redirects": 3})
with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=airflow_connection):
- expected_conn = airflow_connection()
- conn = self.get_hook.get_conn()
- assert dict(conn.headers, **json.loads(expected_conn.extra)) != conn.headers
- assert conn.headers.get("bearer") == "test"
- assert conn.headers.get("allow_redirects") is None
- assert conn.proxies == {}
- assert conn.stream is False
- assert conn.verify is True
- assert conn.cert is None
- assert conn.max_redirects == 3
- assert conn.trust_env is True
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning)
+ expected_conn = airflow_connection()
+ conn = self.get_hook.get_conn()
+ assert dict(conn.headers, **json.loads(expected_conn.extra)) != conn.headers
+ assert conn.headers.get("bearer") == "test"
+ assert conn.headers.get("allow_redirects") is None
+ assert conn.proxies == {}
+ assert conn.stream is False
+ assert conn.verify is True
+ assert conn.cert is None
+ assert conn.max_redirects == 3
+ assert conn.trust_env is True
def test_hook_ignore_proxies_from_extra_field_as_header(self):
airflow_connection = get_airflow_connection_with_extra(
extra={"bearer": "test", "proxies": {"http": "http://proxy:80", "https": "https://proxy:80"}}
)
with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=airflow_connection):
- expected_conn = airflow_connection()
- conn = self.get_hook.get_conn()
- assert dict(conn.headers, **json.loads(expected_conn.extra)) != conn.headers
- assert conn.headers.get("bearer") == "test"
- assert conn.headers.get("proxies") is None
- assert conn.proxies == {"http": "http://proxy:80", "https": "https://proxy:80"}
- assert conn.stream is False
- assert conn.verify is True
- assert conn.cert is None
- assert conn.max_redirects == DEFAULT_REDIRECT_LIMIT
- assert conn.trust_env is True
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning)
+ expected_conn = airflow_connection()
+ conn = self.get_hook.get_conn()
+ assert dict(conn.headers, **json.loads(expected_conn.extra)) != conn.headers
+ assert conn.headers.get("bearer") == "test"
+ assert conn.headers.get("proxies") is None
+ assert conn.proxies == {"http": "http://proxy:80", "https": "https://proxy:80"}
+ assert conn.stream is False
+ assert conn.verify is True
+ assert conn.cert is None
+ assert conn.max_redirects == DEFAULT_REDIRECT_LIMIT
+ assert conn.trust_env is True
def test_hook_ignore_verify_from_extra_field_as_header(self):
airflow_connection = get_airflow_connection_with_extra(extra={"bearer": "test", "verify": False})
with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=airflow_connection):
- expected_conn = airflow_connection()
- conn = self.get_hook.get_conn()
- assert dict(conn.headers, **json.loads(expected_conn.extra)) != conn.headers
- assert conn.headers.get("bearer") == "test"
- assert conn.headers.get("verify") is None
- assert conn.proxies == {}
- assert conn.stream is False
- assert conn.verify is False
- assert conn.cert is None
- assert conn.max_redirects == DEFAULT_REDIRECT_LIMIT
- assert conn.trust_env is True
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning)
+ expected_conn = airflow_connection()
+ conn = self.get_hook.get_conn()
+ assert dict(conn.headers, **json.loads(expected_conn.extra)) != conn.headers
+ assert conn.headers.get("bearer") == "test"
+ assert conn.headers.get("verify") is None
+ assert conn.proxies == {}
+ assert conn.stream is False
+ assert conn.verify is False
+ assert conn.cert is None
+ assert conn.max_redirects == DEFAULT_REDIRECT_LIMIT
+ assert conn.trust_env is True
def test_hook_ignore_cert_from_extra_field_as_header(self):
airflow_connection = get_airflow_connection_with_extra(extra={"bearer": "test", "cert": "cert.crt"})
with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=airflow_connection):
- expected_conn = airflow_connection()
- conn = self.get_hook.get_conn()
- assert dict(conn.headers, **json.loads(expected_conn.extra)) != conn.headers
- assert conn.headers.get("bearer") == "test"
- assert conn.headers.get("cert") is None
- assert conn.proxies == {}
- assert conn.stream is False
- assert conn.verify is True
- assert conn.cert == "cert.crt"
- assert conn.max_redirects == DEFAULT_REDIRECT_LIMIT
- assert conn.trust_env is True
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning)
+ expected_conn = airflow_connection()
+ conn = self.get_hook.get_conn()
+ assert dict(conn.headers, **json.loads(expected_conn.extra)) != conn.headers
+ assert conn.headers.get("bearer") == "test"
+ assert conn.headers.get("cert") is None
+ assert conn.proxies == {}
+ assert conn.stream is False
+ assert conn.verify is True
+ assert conn.cert == "cert.crt"
+ assert conn.max_redirects == DEFAULT_REDIRECT_LIMIT
+ assert conn.trust_env is True
def test_hook_ignore_trust_env_from_extra_field_as_header(self):
airflow_connection = get_airflow_connection_with_extra(extra={"bearer": "test", "trust_env": False})
with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=airflow_connection):
- expected_conn = airflow_connection()
- conn = self.get_hook.get_conn()
- assert dict(conn.headers, **json.loads(expected_conn.extra)) != conn.headers
- assert conn.headers.get("bearer") == "test"
- assert conn.headers.get("cert") is None
- assert conn.proxies == {}
- assert conn.stream is False
- assert conn.verify is True
- assert conn.cert is None
- assert conn.max_redirects == DEFAULT_REDIRECT_LIMIT
- assert conn.trust_env is False
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning)
+ expected_conn = airflow_connection()
+ conn = self.get_hook.get_conn()
+ assert dict(conn.headers, **json.loads(expected_conn.extra)) != conn.headers
+ assert conn.headers.get("bearer") == "test"
+ assert conn.headers.get("cert") is None
+ assert conn.proxies == {}
+ assert conn.stream is False
+ assert conn.verify is True
+ assert conn.cert is None
+ assert conn.max_redirects == DEFAULT_REDIRECT_LIMIT
+ assert conn.trust_env is False
@mock.patch("requests.Request")
def test_hook_with_method_in_lowercase(self, mock_requests):
@@ -227,8 +244,10 @@ def test_hook_has_no_header_from_extra(self):
def test_hooks_header_from_extra_is_overridden(self):
with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection):
- conn = self.get_hook.get_conn(headers={"bearer": "newT0k3n"})
- assert conn.headers.get("bearer") == "newT0k3n"
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning)
+ conn = self.get_hook.get_conn(headers={"bearer": "newT0k3n"})
+ assert conn.headers.get("bearer") == "newT0k3n"
def test_post_request(self, requests_mock):
requests_mock.post(
@@ -236,8 +255,10 @@ def test_post_request(self, requests_mock):
)
with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection):
- resp = self.post_hook.run("v1/test")
- assert resp.status_code == 200
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning)
+ resp = self.post_hook.run("v1/test")
+ assert resp.status_code == 200
def test_post_request_with_error_code(self, requests_mock):
requests_mock.post(
@@ -249,7 +270,9 @@ def test_post_request_with_error_code(self, requests_mock):
with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection):
with pytest.raises(AirflowException):
- self.post_hook.run("v1/test")
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning)
+ self.post_hook.run("v1/test")
def test_post_request_do_not_raise_for_status_if_check_response_is_false(self, requests_mock):
requests_mock.post(
@@ -260,8 +283,10 @@ def test_post_request_do_not_raise_for_status_if_check_response_is_false(self, r
)
with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection):
- resp = self.post_hook.run("v1/test", extra_options={"check_response": False})
- assert resp.status_code == 418
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning)
+ resp = self.post_hook.run("v1/test", extra_options={"check_response": False})
+ assert resp.status_code == 418
@pytest.mark.db_test
@mock.patch("airflow.providers.http.hooks.http.requests.Session")
@@ -291,8 +316,10 @@ def test_run_with_advanced_retry(self, requests_mock):
reraise=True,
)
with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection):
- response = self.get_hook.run_with_advanced_retry(endpoint="v1/test", _retry_args=retry_args)
- assert isinstance(response, requests.Response)
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning)
+ response = self.get_hook.run_with_advanced_retry(endpoint="v1/test", _retry_args=retry_args)
+ assert isinstance(response, requests.Response)
def test_header_from_extra_and_run_method_are_merged(self):
def run_and_return(unused_session, prepped_request, unused_extra_options, **kwargs):
@@ -303,10 +330,12 @@ def run_and_return(unused_session, prepped_request, unused_extra_options, **kwar
"airflow.providers.http.hooks.http.HttpHook.run_and_check", side_effect=run_and_return
):
with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection):
- prepared_request = self.get_hook.run("v1/test", headers={"some_other_header": "test"})
- actual = dict(prepared_request.headers)
- assert actual.get("bearer") == "test"
- assert actual.get("some_other_header") == "test"
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning)
+ prepared_request = self.get_hook.run("v1/test", headers={"some_other_header": "test"})
+ actual = dict(prepared_request.headers)
+ assert actual.get("bearer") == "test"
+ assert actual.get("some_other_header") == "test"
@mock.patch("airflow.providers.http.hooks.http.HttpHook.get_connection")
def test_http_connection(self, mock_get_connection):
@@ -362,8 +391,10 @@ def match_obj1(request):
requests_mock.request(method=method, url="//test:8080/v1/test", additional_matcher=match_obj1)
with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection):
- # will raise NoMockAddress exception if obj1 != request.json()
- HttpHook(method=method).run("v1/test", json=obj1)
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning)
+ # will raise NoMockAddress exception if obj1 != request.json()
+ HttpHook(method=method).run("v1/test", json=obj1)
@mock.patch("airflow.providers.http.hooks.http.requests.Session.send")
def test_verify_set_to_true_by_default(self, mock_session_send):
@@ -438,35 +469,43 @@ def test_verify_false_parameter_overwrites_set_requests_ca_bundle_env_var(self,
def test_connection_success(self, requests_mock):
requests_mock.get("http://test:8080", status_code=200, json={"status": {"status": 200}}, reason="OK")
with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection):
- status, msg = self.get_hook.test_connection()
- assert status is True
- assert msg == "Connection successfully tested"
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning)
+ status, msg = self.get_hook.test_connection()
+ assert status is True
+ assert msg == "Connection successfully tested"
def test_connection_failure(self, requests_mock):
requests_mock.get(
"http://test:8080", status_code=500, json={"message": "internal server error"}, reason="NOT_OK"
)
with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection):
- status, msg = self.get_hook.test_connection()
- assert status is False
- assert msg == "500:NOT_OK"
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning)
+ status, msg = self.get_hook.test_connection()
+ assert status is False
+ assert msg == "500:NOT_OK"
@mock.patch("requests.auth.AuthBase.__init__")
def test_loginless_custom_auth_initialized_with_no_args(self, auth):
with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection):
- auth.return_value = None
- hook = HttpHook("GET", "http_default", AuthBase)
- hook.get_conn()
- auth.assert_called_once_with()
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning)
+ auth.return_value = None
+ hook = HttpHook("GET", "http_default", AuthBase)
+ hook.get_conn()
+ auth.assert_called_once_with()
@mock.patch("requests.auth.AuthBase.__init__")
def test_loginless_custom_auth_initialized_with_args(self, auth):
with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection):
- auth.return_value = None
- auth_with_args = functools.partial(AuthBase, "test_arg")
- hook = HttpHook("GET", "http_default", auth_with_args)
- hook.get_conn()
- auth.assert_called_once_with("test_arg")
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning)
+ auth.return_value = None
+ auth_with_args = functools.partial(AuthBase, "test_arg")
+ hook = HttpHook("GET", "http_default", auth_with_args)
+ hook.get_conn()
+ auth.assert_called_once_with("test_arg")
@mock.patch("requests.auth.HTTPBasicAuth.__init__")
def test_login_password_basic_auth_initialized(self, auth):
@@ -474,18 +513,22 @@ def test_login_password_basic_auth_initialized(self, auth):
"airflow.hooks.base.BaseHook.get_connection",
side_effect=get_airflow_connection_with_login_and_password,
):
- auth.return_value = None
- hook = HttpHook("GET", "http_default", HTTPBasicAuth)
- hook.get_conn()
- auth.assert_called_once_with("username", "pass")
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning)
+ auth.return_value = None
+ hook = HttpHook("GET", "http_default", HTTPBasicAuth)
+ hook.get_conn()
+ auth.assert_called_once_with("username", "pass")
@mock.patch("requests.auth.HTTPBasicAuth.__init__")
def test_default_auth_not_initialized(self, auth):
with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection):
- auth.return_value = None
- hook = HttpHook("GET", "http_default")
- hook.get_conn()
- auth.assert_not_called()
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", category=AirflowProviderDeprecationWarning)
+ auth.return_value = None
+ hook = HttpHook("GET", "http_default")
+ hook.get_conn()
+ auth.assert_not_called()
def test_keep_alive_enabled(self):
with (
diff --git a/providers/tests/http/sensors/test_http.py b/providers/tests/http/sensors/test_http.py
index 78a11e15bb7c1..47af8f49c48cf 100644
--- a/providers/tests/http/sensors/test_http.py
+++ b/providers/tests/http/sensors/test_http.py
@@ -238,10 +238,14 @@ def resp_check(_):
class FakeSession:
+ """Mock requests.Session object."""
+
def __init__(self):
self.response = requests.Response()
self.response.status_code = 200
self.response._content = "apache/airflow".encode("ascii", "ignore")
+ self.headers = {}
+ self.auth = None
def send(self, *args, **kwargs):
return self.response
diff --git a/tests/www/views/test_views_connection.py b/tests/www/views/test_views_connection.py
index f9a4efd11c15b..19a36c0ac6b39 100644
--- a/tests/www/views/test_views_connection.py
+++ b/tests/www/views/test_views_connection.py
@@ -459,8 +459,12 @@ def test_process_form_invalid_extra_removed(admin_client):
"""
Test that when an invalid json `extra` is passed in the form, it is removed and _not_
saved over the existing extras.
+
+ Note: This can only be tested with a Hook which does not have any custom fields (otherwise
+ the custom fields override the extra data when editing a Connection). Thus, this is currently
+ tested with ftp.
"""
- conn_details = {"conn_id": "test_conn", "conn_type": "http"}
+ conn_details = {"conn_id": "test_conn", "conn_type": "ftp"}
conn = Connection(**conn_details, extra='{"foo": "bar"}')
conn.id = 1