diff --git a/airflow/contrib/hooks/azure_container_registry_hook.py b/airflow/contrib/hooks/azure_container_registry_hook.py index af38c1a94380e..b8604b785005f 100644 --- a/airflow/contrib/hooks/azure_container_registry_hook.py +++ b/airflow/contrib/hooks/azure_container_registry_hook.py @@ -16,10 +16,12 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +"""Hook for Azure Container Registry""" -from airflow.hooks.base_hook import BaseHook from azure.mgmt.containerinstance.models import ImageRegistryCredential +from airflow.hooks.base_hook import BaseHook + class AzureContainerRegistryHook(BaseHook): """ diff --git a/airflow/contrib/hooks/cloudant_hook.py b/airflow/contrib/hooks/cloudant_hook.py index ac64e8950d99f..a29f71dcb3a81 100644 --- a/airflow/contrib/hooks/cloudant_hook.py +++ b/airflow/contrib/hooks/cloudant_hook.py @@ -16,7 +16,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +"""Hook for Cloudant""" from cloudant import cloudant from airflow.exceptions import AirflowException diff --git a/airflow/contrib/hooks/databricks_hook.py b/airflow/contrib/hooks/databricks_hook.py index 077bd9e4c6776..7b8ac5e8b3b98 100644 --- a/airflow/contrib/hooks/databricks_hook.py +++ b/airflow/contrib/hooks/databricks_hook.py @@ -16,17 +16,17 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +"""Databricks hook.""" from urllib.parse import urlparse +from time import sleep import requests +from requests import exceptions as requests_exceptions +from requests.auth import AuthBase from airflow import __version__ from airflow.exceptions import AirflowException from airflow.hooks.base_hook import BaseHook -from requests import exceptions as requests_exceptions -from requests.auth import AuthBase -from time import sleep RESTART_CLUSTER_ENDPOINT = ("POST", "api/2.0/clusters/restart") START_CLUSTER_ENDPOINT = ("POST", "api/2.0/clusters/start") @@ -39,29 +39,59 @@ USER_AGENT_HEADER = {'user-agent': 'airflow-{v}'.format(v=__version__)} +class RunState: + """ + Utility class for the run state concept of Databricks runs. + """ + def __init__(self, life_cycle_state, result_state, state_message): + self.life_cycle_state = life_cycle_state + self.result_state = result_state + self.state_message = state_message + + @property + def is_terminal(self) -> bool: + """True if the current state is a terminal state.""" + if self.life_cycle_state not in RUN_LIFE_CYCLE_STATES: + raise AirflowException( + ('Unexpected life cycle state: {}: If the state has ' + 'been introduced recently, please check the Databricks user ' + 'guide for troubleshooting information').format( + self.life_cycle_state)) + return self.life_cycle_state in ('TERMINATED', 'SKIPPED', 'INTERNAL_ERROR') + + @property + def is_successful(self) -> bool: + """True if the result state is SUCCESS""" + return self.result_state == 'SUCCESS' + + def __eq__(self, other): + return self.life_cycle_state == other.life_cycle_state and \ + self.result_state == other.result_state and \ + self.state_message == other.state_message + + def __repr__(self): + return str(self.__dict__) + + +# noinspection PyAbstractClass class DatabricksHook(BaseHook): """ Interact with Databricks. + + :param databricks_conn_id: The name of the databricks connection to use. + :type databricks_conn_id: str + :param timeout_seconds: The amount of time in seconds the requests library + will wait before timing-out. + :type timeout_seconds: int + :param retry_limit: The number of times to retry the connection in case of + service outages. + :type retry_limit: int + :param retry_delay: The number of seconds to wait between retries (it + might be a floating point number). + :type retry_delay: float """ - def __init__( - self, - databricks_conn_id='databricks_default', - timeout_seconds=180, - retry_limit=3, - retry_delay=1.0): - """ - :param databricks_conn_id: The name of the databricks connection to use. - :type databricks_conn_id: str - :param timeout_seconds: The amount of time in seconds the requests library - will wait before timing-out. - :type timeout_seconds: int - :param retry_limit: The number of times to retry the connection in case of - service outages. - :type retry_limit: int - :param retry_delay: The number of seconds to wait between retries (it - might be a floating point number). - :type retry_delay: float - """ + def __init__(self, databricks_conn_id='databricks_default', timeout_seconds=180, retry_limit=3, + retry_delay=1.0): self.databricks_conn_id = databricks_conn_id self.databricks_conn = self.get_connection(databricks_conn_id) self.timeout_seconds = timeout_seconds @@ -189,12 +219,24 @@ def submit_run(self, json): response = self._do_api_call(SUBMIT_RUN_ENDPOINT, json) return response['run_id'] - def get_run_page_url(self, run_id): + def get_run_page_url(self, run_id: str) -> str: + """ + Retrieves run_page_url. + + :param run_id: id of the run + :return: URL of the run page + """ json = {'run_id': run_id} response = self._do_api_call(GET_RUN_ENDPOINT, json) return response['run_page_url'] - def get_run_state(self, run_id): + def get_run_state(self, run_id: str) -> RunState: + """ + Retrieves run state of the run. + + :param run_id: id of the run + :return: state of the run + """ json = {'run_id': run_id} response = self._do_api_call(GET_RUN_ENDPOINT, json) state = response['state'] @@ -204,23 +246,42 @@ def get_run_state(self, run_id): state_message = state['state_message'] return RunState(life_cycle_state, result_state, state_message) - def cancel_run(self, run_id): + def cancel_run(self, run_id: str) -> None: + """ + Cancels the run. + + :param run_id: id of the run + """ json = {'run_id': run_id} self._do_api_call(CANCEL_RUN_ENDPOINT, json) - def restart_cluster(self, json): + def restart_cluster(self, json: dict) -> None: + """ + Restarts the cluster. + + :param json: json dictionary containing cluster specification. + """ self._do_api_call(RESTART_CLUSTER_ENDPOINT, json) - def start_cluster(self, json): + def start_cluster(self, json: dict) -> None: + """ + Starts the cluster. + + :param json: json dictionary containing cluster specification. + """ self._do_api_call(START_CLUSTER_ENDPOINT, json) - def terminate_cluster(self, json): + def terminate_cluster(self, json: dict) -> None: + """ + Terminates the cluster. + + :param json: json dictionary containing cluster specification. + """ self._do_api_call(TERMINATE_CLUSTER_ENDPOINT, json) def _retryable_error(exception): - return isinstance(exception, requests_exceptions.ConnectionError) \ - or isinstance(exception, requests_exceptions.Timeout) \ + return isinstance(exception, (requests_exceptions.ConnectionError, requests_exceptions.Timeout)) \ or exception.response is not None and exception.response.status_code >= 500 @@ -234,38 +295,6 @@ def _retryable_error(exception): ] -class RunState: - """ - Utility class for the run state concept of Databricks runs. - """ - def __init__(self, life_cycle_state, result_state, state_message): - self.life_cycle_state = life_cycle_state - self.result_state = result_state - self.state_message = state_message - - @property - def is_terminal(self): - if self.life_cycle_state not in RUN_LIFE_CYCLE_STATES: - raise AirflowException( - ('Unexpected life cycle state: {}: If the state has ' - 'been introduced recently, please check the Databricks user ' - 'guide for troubleshooting information').format( - self.life_cycle_state)) - return self.life_cycle_state in ('TERMINATED', 'SKIPPED', 'INTERNAL_ERROR') - - @property - def is_successful(self): - return self.result_state == 'SUCCESS' - - def __eq__(self, other): - return self.life_cycle_state == other.life_cycle_state and \ - self.result_state == other.result_state and \ - self.state_message == other.state_message - - def __repr__(self): - return str(self.__dict__) - - class _TokenAuth(AuthBase): """ Helper class for requests Auth field. AuthBase requires you to implement the __call__ diff --git a/airflow/contrib/hooks/jira_hook.py b/airflow/contrib/hooks/jira_hook.py index c8e3add439fda..088e2d2a69dba 100644 --- a/airflow/contrib/hooks/jira_hook.py +++ b/airflow/contrib/hooks/jira_hook.py @@ -16,6 +16,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +"""Hook for JIRA""" from jira import JIRA from jira.exceptions import JIRAError @@ -45,41 +46,41 @@ def get_conn(self): get_server_info = True validate = True extra_options = {} - conn = None + if not self.jira_conn_id: + raise AirflowException('Failed to create jira client. no jira_conn_id provided') - if self.jira_conn_id is not None: - conn = self.get_connection(self.jira_conn_id) - if conn.extra is not None: - extra_options = conn.extra_dejson - # only required attributes are taken for now, - # more can be added ex: async, logging, max_retries + conn = self.get_connection(self.jira_conn_id) + if conn.extra is not None: + extra_options = conn.extra_dejson + # only required attributes are taken for now, + # more can be added ex: async, logging, max_retries - # verify - if 'verify' in extra_options \ - and extra_options['verify'].lower() == 'false': - extra_options['verify'] = False + # verify + if 'verify' in extra_options \ + and extra_options['verify'].lower() == 'false': + extra_options['verify'] = False - # validate - if 'validate' in extra_options \ - and extra_options['validate'].lower() == 'false': - validate = False + # validate + if 'validate' in extra_options \ + and extra_options['validate'].lower() == 'false': + validate = False - if 'get_server_info' in extra_options \ - and extra_options['get_server_info'].lower() == 'false': - get_server_info = False + if 'get_server_info' in extra_options \ + and extra_options['get_server_info'].lower() == 'false': + get_server_info = False - try: - self.client = JIRA(conn.host, - options=extra_options, - basic_auth=(conn.login, conn.password), - get_server_info=get_server_info, - validate=validate, - proxies=self.proxies) - except JIRAError as jira_error: - raise AirflowException('Failed to create jira client, jira error: %s' - % str(jira_error)) - except Exception as e: - raise AirflowException('Failed to create jira client, error: %s' - % str(e)) + try: + self.client = JIRA(conn.host, + options=extra_options, + basic_auth=(conn.login, conn.password), + get_server_info=get_server_info, + validate=validate, + proxies=self.proxies) + except JIRAError as jira_error: + raise AirflowException('Failed to create jira client, jira error: %s' + % str(jira_error)) + except Exception as e: + raise AirflowException('Failed to create jira client, error: %s' + % str(e)) return self.client diff --git a/airflow/contrib/hooks/mongo_hook.py b/airflow/contrib/hooks/mongo_hook.py index 623d054ca350c..6ab96065d9a08 100644 --- a/airflow/contrib/hooks/mongo_hook.py +++ b/airflow/contrib/hooks/mongo_hook.py @@ -16,10 +16,11 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +"""Hook for Mongo DB""" from ssl import CERT_NONE +from pymongo import MongoClient, ReplaceOne from airflow.hooks.base_hook import BaseHook -from pymongo import MongoClient, ReplaceOne class MongoHook(BaseHook): @@ -83,7 +84,8 @@ def get_conn(self): return self.client - def close_conn(self): + def close_conn(self) -> None: + """Closes connection""" client = self.client if client is not None: client.close() diff --git a/airflow/contrib/hooks/redis_hook.py b/airflow/contrib/hooks/redis_hook.py index a63c839e34c85..3ef7e87003cbe 100644 --- a/airflow/contrib/hooks/redis_hook.py +++ b/airflow/contrib/hooks/redis_hook.py @@ -24,6 +24,7 @@ from airflow.hooks.base_hook import BaseHook +# noinspection PyAbstractClass class RedisHook(BaseHook): """ Wrapper for connection to interact with Redis in-memory data structure store diff --git a/airflow/contrib/hooks/salesforce_hook.py b/airflow/contrib/hooks/salesforce_hook.py index 33bfc73b13cf7..ecca48115b590 100644 --- a/airflow/contrib/hooks/salesforce_hook.py +++ b/airflow/contrib/hooks/salesforce_hook.py @@ -34,26 +34,24 @@ class SalesforceHook(BaseHook): + """ + Create new connection to Salesforce and allows you to pull data out of SFDC and save it to a file. - def __init__(self, conn_id): - """ - Create new connection to Salesforce and allows you to pull data out of SFDC and save it to a file. + You can then use that file with other Airflow operators to move the data into another data source. - You can then use that file with other Airflow operators to move the data into another data source. + :param conn_id: the name of the connection that has the parameters we need to connect to Salesforce. + The connection should be type `http` and include a user's security token in the `Extras` field. + :type conn_id: str - :param conn_id: the name of the connection that has the parameters we need to connect to Salesforce. - The connection should be type `http` and include a user's security token in the `Extras` field. - :type conn_id: str + .. note:: + For the HTTP connection type, you can include a + JSON structure in the `Extras` field. + We need a user's security token to connect to Salesforce. + So we define it in the `Extras` field as `{"security_token":"YOUR_SECURITY_TOKEN"}` - .. note:: - For the HTTP connection type, you can include a - JSON structure in the `Extras` field. - We need a user's security token to connect to Salesforce. - So we define it in the `Extras` field as: - `{"security_token":"YOUR_SECURITY_TOKEN"}` + """ - """ - super().__init__(conn_id) + def __init__(self, conn_id): self.conn_id = conn_id self.conn = None diff --git a/airflow/contrib/hooks/ssh_hook.py b/airflow/contrib/hooks/ssh_hook.py index 10085507513d1..b431b9f703928 100755 --- a/airflow/contrib/hooks/ssh_hook.py +++ b/airflow/contrib/hooks/ssh_hook.py @@ -16,7 +16,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +"""Hook for SSH connections.""" import getpass import os import warnings @@ -29,6 +29,7 @@ from airflow.hooks.base_hook import BaseHook +# noinspection PyAbstractClass class SSHHook(BaseHook): """ Hook for ssh remote execution using Paramiko. @@ -238,7 +239,16 @@ def get_tunnel(self, remote_port, remote_host="localhost", local_port=None): return client - def create_tunnel(self, local_port, remote_port=None, remote_host="localhost"): + def create_tunnel(self, local_port: int, remote_port: int = None, remote_host: str = "localhost") \ + -> SSHTunnelForwarder: + """ + Creates tunnel for SSH connection [Deprecated]. + + :param local_port: local port number + :param remote_port: remote port number + :param remote_host: remote host + :return: + """ warnings.warn('SSHHook.create_tunnel is deprecated, Please' 'use get_tunnel() instead. But please note that the' 'order of the parameters have changed' diff --git a/airflow/contrib/hooks/winrm_hook.py b/airflow/contrib/hooks/winrm_hook.py index 492e375156a11..f81756c6fa816 100644 --- a/airflow/contrib/hooks/winrm_hook.py +++ b/airflow/contrib/hooks/winrm_hook.py @@ -17,7 +17,7 @@ # specific language governing permissions and limitations # under the License. # - +"""Hook for winrm remote execution.""" import getpass from winrm.protocol import Protocol @@ -26,6 +26,8 @@ from airflow.hooks.base_hook import BaseHook +# TODO: Fixme please - I have too complex implementation +# pylint: disable=too-many-instance-attributes,too-many-arguments,too-many-branches class WinRMHook(BaseHook): """ Hook for winrm remote execution using pywinrm. diff --git a/airflow/hooks/base_hook.py b/airflow/hooks/base_hook.py index c6b31eb229b84..433a7aec9398f 100644 --- a/airflow/hooks/base_hook.py +++ b/airflow/hooks/base_hook.py @@ -16,7 +16,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +"""Base class for all hooks""" import os import random from typing import Iterable @@ -62,6 +62,12 @@ def _get_connection_from_env(cls, conn_id): @classmethod def get_connections(cls, conn_id: str) -> Iterable[Connection]: + """ + Get all connections as an iterable. + + :param conn_id connection id + :return array of connections + """ conn = cls._get_connection_from_env(conn_id) if conn: conns = [conn] @@ -71,6 +77,12 @@ def get_connections(cls, conn_id: str) -> Iterable[Connection]: @classmethod def get_connection(cls, conn_id: str) -> Connection: + """ + Get random connection selected from all connections configured with this connection id. + + :param conn_id connection id + :return connection + """ conn = random.choice(list(cls.get_connections(conn_id))) if conn.host: log = LoggingMixin().log @@ -79,18 +91,31 @@ def get_connection(cls, conn_id: str) -> Connection: @classmethod def get_hook(cls, conn_id: str) -> "BaseHook": - # TODO: set method return type to BaseHook class when on 3.7+. See https://stackoverflow.com/a/33533514/3066428 # noqa: E501 + """ + Returns default hook for this connection id. + :param conn_id: connection id + :return: default hook for this connection + """ + # TODO: set method return type to BaseHook class when on 3.7+. + # See https://stackoverflow.com/a/33533514/3066428 connection = cls.get_connection(conn_id) return connection.get_hook() def get_conn(self): + """Returns connection for the hook.""" raise NotImplementedError() def get_records(self, sql): + """Returns records for the sql query (for hooks that support SQL).""" + # TODO: move it out from the base hook. It belongs to some common SQL hook most likely raise NotImplementedError() def get_pandas_df(self, sql): + """Returns pandas dataframe for the sql query (for hooks that support SQL).""" + # TODO: move it out from the base hook. It belongs to some common SQL hook most likely raise NotImplementedError() def run(self, sql): + """Runs SQL query (for hooks that support SQL).""" + # TODO: move it out from the base hook. It belongs to some common SQL hook most likely raise NotImplementedError() diff --git a/airflow/hooks/hdfs_hook.py b/airflow/hooks/hdfs_hook.py index 31b2f11501a01..8dbd5e7030c3e 100644 --- a/airflow/hooks/hdfs_hook.py +++ b/airflow/hooks/hdfs_hook.py @@ -16,21 +16,23 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +"""Hook for HDFS operations""" from airflow import configuration from airflow.exceptions import AirflowException from airflow.hooks.base_hook import BaseHook try: - from snakebite.client import Client, HAClient, Namenode, AutoConfigClient + from snakebite.client import Client, HAClient, Namenode, AutoConfigClient # pylint: disable=syntax-error snakebite_loaded = True except ImportError: snakebite_loaded = False class HDFSHookException(AirflowException): - pass + """Exception specific for HDFS""" +# noinspection PyAbstractClass class HDFSHook(BaseHook): """ Interact with HDFS. This class is a wrapper around the snakebite library. @@ -87,8 +89,8 @@ def get_conn(self): effective_user=effective_user, use_sasl=use_sasl, hdfs_namenode_principal=hdfs_namenode_principal) elif len(connections) > 1: - nn = [Namenode(conn.host, conn.port) for conn in connections] - client = HAClient(nn, effective_user=effective_user, + name_node = [Namenode(conn.host, conn.port) for conn in connections] + client = HAClient(name_node, effective_user=effective_user, use_sasl=use_sasl, hdfs_namenode_principal=hdfs_namenode_principal) else: diff --git a/airflow/hooks/slack_hook.py b/airflow/hooks/slack_hook.py index 953ce319b1eee..bf8cd0fbb198f 100644 --- a/airflow/hooks/slack_hook.py +++ b/airflow/hooks/slack_hook.py @@ -16,28 +16,24 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +"""Hook for Slack""" from slackclient import SlackClient from airflow.hooks.base_hook import BaseHook from airflow.exceptions import AirflowException +# noinspection PyAbstractClass class SlackHook(BaseHook): """ - Interact with Slack, using slackclient library. - """ - - def __init__(self, token=None, slack_conn_id=None): - """ - Takes both Slack API token directly and connection that has Slack API token. + Takes both Slack API token directly and connection that has Slack API token. - If both supplied, Slack API token will be used. + If both supplied, Slack API token will be used. - :param token: Slack API token - :type token: str - :param slack_conn_id: connection that has Slack API token in the password field - :type slack_conn_id: str - """ + :param token: Slack API token + :param slack_conn_id: connection that has Slack API token in the password field + """ + def __init__(self, token: str = None, slack_conn_id: str = None) -> None: self.token = self.__get_token(token, slack_conn_id) def __get_token(self, token, slack_conn_id): @@ -53,10 +49,16 @@ def __get_token(self, token, slack_conn_id): raise AirflowException('Cannot get token: ' 'No valid Slack token nor slack_conn_id supplied.') - def call(self, method, api_params): - sc = SlackClient(self.token) - rc = sc.api_call(method, **api_params) + def call(self, method: str, api_params: dict) -> None: + """ + Calls the clack client. + + :param method: method + :param api_params: parameters of the API + """ + slack_client = SlackClient(self.token) + return_code = slack_client.api_call(method, **api_params) - if not rc['ok']: - msg = "Slack API call failed ({})".format(rc['error']) + if not return_code['ok']: + msg = "Slack API call failed ({})".format(return_code['error']) raise AirflowException(msg) diff --git a/airflow/hooks/webhdfs_hook.py b/airflow/hooks/webhdfs_hook.py index e93877b2ab80c..73314dc14efc9 100644 --- a/airflow/hooks/webhdfs_hook.py +++ b/airflow/hooks/webhdfs_hook.py @@ -16,7 +16,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +"""Hook for Web HDFS""" from hdfs import InsecureClient, HdfsError from airflow import configuration @@ -27,7 +27,7 @@ _kerberos_security_mode = configuration.conf.get("core", "security") == "kerberos" if _kerberos_security_mode: try: - from hdfs.ext.kerberos import KerberosClient + from hdfs.ext.kerberos import KerberosClient # pylint: disable=ungrouped-imports except ImportError: log = LoggingMixin().log log.error("Could not load the Kerberos extension for the WebHDFSHook.") @@ -35,7 +35,7 @@ class AirflowWebHDFSHookException(AirflowException): - pass + """Exception specific for WebHDFS hook""" class WebHDFSHook(BaseHook): diff --git a/scripts/ci/pylint_todo.txt b/scripts/ci/pylint_todo.txt index 8a26fca822223..43f0cfe5a845c 100644 --- a/scripts/ci/pylint_todo.txt +++ b/scripts/ci/pylint_todo.txt @@ -8,13 +8,11 @@ ./airflow/contrib/auth/backends/ldap_auth.py ./airflow/contrib/auth/backends/password_auth.py ./airflow/contrib/hooks/azure_container_instance_hook.py -./airflow/contrib/hooks/azure_container_registry_hook.py ./airflow/contrib/hooks/azure_container_volume_hook.py ./airflow/contrib/hooks/azure_cosmos_hook.py ./airflow/contrib/hooks/azure_data_lake_hook.py ./airflow/contrib/hooks/azure_fileshare_hook.py ./airflow/contrib/hooks/cassandra_hook.py -./airflow/contrib/hooks/cloudant_hook.py ./airflow/contrib/hooks/databricks_hook.py ./airflow/contrib/hooks/datadog_hook.py ./airflow/contrib/hooks/datastore_hook.py @@ -25,8 +23,6 @@ ./airflow/contrib/hooks/ftp_hook.py ./airflow/contrib/hooks/grpc_hook.py ./airflow/contrib/hooks/jenkins_hook.py -./airflow/contrib/hooks/jira_hook.py -./airflow/contrib/hooks/mongo_hook.py ./airflow/contrib/hooks/openfaas_hook.py ./airflow/contrib/hooks/opsgenie_alert_hook.py ./airflow/contrib/hooks/pinot_hook.py @@ -34,7 +30,6 @@ ./airflow/contrib/hooks/qubole_hook.py ./airflow/contrib/hooks/redshift_hook.py ./airflow/contrib/hooks/sagemaker_hook.py -./airflow/contrib/hooks/salesforce_hook.py ./airflow/contrib/hooks/segment_hook.py ./airflow/contrib/hooks/sftp_hook.py ./airflow/contrib/hooks/slack_webhook_hook.py @@ -44,10 +39,8 @@ ./airflow/contrib/hooks/spark_sql_hook.py ./airflow/contrib/hooks/spark_submit_hook.py ./airflow/contrib/hooks/sqoop_hook.py -./airflow/contrib/hooks/ssh_hook.py ./airflow/contrib/hooks/vertica_hook.py ./airflow/contrib/hooks/wasb_hook.py -./airflow/contrib/hooks/winrm_hook.py ./airflow/contrib/operators/adls_list_operator.py ./airflow/contrib/operators/adls_to_gcs.py ./airflow/contrib/operators/awsbatch_operator.py @@ -155,7 +148,6 @@ ./airflow/executors/local_executor.py ./airflow/executors/sequential_executor.py ./airflow/executors/__init__.py -./airflow/hooks/base_hook.py ./airflow/hooks/dbapi_hook.py ./airflow/hooks/docker_hook.py ./airflow/hooks/druid_hook.py @@ -170,9 +162,7 @@ ./airflow/hooks/postgres_hook.py ./airflow/hooks/presto_hook.py ./airflow/hooks/samba_hook.py -./airflow/hooks/slack_hook.py ./airflow/hooks/sqlite_hook.py -./airflow/hooks/webhdfs_hook.py ./airflow/hooks/zendesk_hook.py ./airflow/hooks/__init__.py ./airflow/jobs/backfill_job.py @@ -413,7 +403,6 @@ ./tests/contrib/hooks/test_spark_sql_hook.py ./tests/contrib/hooks/test_spark_submit_hook.py ./tests/contrib/hooks/test_sqoop_hook.py -./tests/contrib/hooks/test_ssh_hook.py ./tests/contrib/hooks/test_zendesk_hook.py ./tests/contrib/operators/test_aws_sqs_publish_operator.py ./tests/contrib/operators/test_azure_container_instances_operator.py diff --git a/tests/contrib/hooks/test_ssh_hook.py b/tests/contrib/hooks/test_ssh_hook.py index 15b853a4236b9..2b14bbe1c72cf 100644 --- a/tests/contrib/hooks/test_ssh_hook.py +++ b/tests/contrib/hooks/test_ssh_hook.py @@ -18,12 +18,12 @@ # under the License. import unittest + from airflow.models import Connection from airflow.utils import db -from tests.compat import mock - from airflow.contrib.hooks.ssh_hook import SSHHook +from tests.compat import mock HELLO_SERVER_CMD = """ import socket, sys @@ -156,11 +156,11 @@ def test_tunnel(self): with hook.create_tunnel(2135, 2134): server_output = server_handle.stdout.read(5) self.assertEqual(server_output, b"ready") - s = socket.socket() - s.connect(("localhost", 2135)) - response = s.recv(5) + socket = socket.socket() + socket.connect(("localhost", 2135)) + response = socket.recv(5) self.assertEqual(response, b"hello") - s.close() + socket.close() server_handle.communicate() self.assertEqual(server_handle.returncode, 0)