Skip to content

Commit

Permalink
[AIRFLOW-5248] Pylint fixes related to source constructor param removal
Browse files Browse the repository at this point in the history
  • Loading branch information
potiuk committed Aug 18, 2019
1 parent 053f522 commit 6ad628c
Show file tree
Hide file tree
Showing 15 changed files with 222 additions and 159 deletions.
4 changes: 3 additions & 1 deletion airflow/contrib/hooks/azure_container_registry_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Hook for Azure Container Registry"""

from airflow.hooks.base_hook import BaseHook
from azure.mgmt.containerinstance.models import ImageRegistryCredential

from airflow.hooks.base_hook import BaseHook


class AzureContainerRegistryHook(BaseHook):
"""
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/hooks/cloudant_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Hook for Cloudant"""
from cloudant import cloudant

from airflow.exceptions import AirflowException
Expand Down
155 changes: 92 additions & 63 deletions airflow/contrib/hooks/databricks_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Databricks hook."""
from urllib.parse import urlparse
from time import sleep

import requests
from requests import exceptions as requests_exceptions
from requests.auth import AuthBase

from airflow import __version__
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
from requests import exceptions as requests_exceptions
from requests.auth import AuthBase
from time import sleep

RESTART_CLUSTER_ENDPOINT = ("POST", "api/2.0/clusters/restart")
START_CLUSTER_ENDPOINT = ("POST", "api/2.0/clusters/start")
Expand All @@ -39,29 +39,59 @@
USER_AGENT_HEADER = {'user-agent': 'airflow-{v}'.format(v=__version__)}


class RunState:
"""
Utility class for the run state concept of Databricks runs.
"""
def __init__(self, life_cycle_state, result_state, state_message):
self.life_cycle_state = life_cycle_state
self.result_state = result_state
self.state_message = state_message

@property
def is_terminal(self) -> bool:
"""True if the current state is a terminal state."""
if self.life_cycle_state not in RUN_LIFE_CYCLE_STATES:
raise AirflowException(
('Unexpected life cycle state: {}: If the state has '
'been introduced recently, please check the Databricks user '
'guide for troubleshooting information').format(
self.life_cycle_state))
return self.life_cycle_state in ('TERMINATED', 'SKIPPED', 'INTERNAL_ERROR')

@property
def is_successful(self) -> bool:
"""True if the result state is SUCCESS"""
return self.result_state == 'SUCCESS'

def __eq__(self, other):
return self.life_cycle_state == other.life_cycle_state and \
self.result_state == other.result_state and \
self.state_message == other.state_message

def __repr__(self):
return str(self.__dict__)


# noinspection PyAbstractClass
class DatabricksHook(BaseHook):
"""
Interact with Databricks.
:param databricks_conn_id: The name of the databricks connection to use.
:type databricks_conn_id: str
:param timeout_seconds: The amount of time in seconds the requests library
will wait before timing-out.
:type timeout_seconds: int
:param retry_limit: The number of times to retry the connection in case of
service outages.
:type retry_limit: int
:param retry_delay: The number of seconds to wait between retries (it
might be a floating point number).
:type retry_delay: float
"""
def __init__(
self,
databricks_conn_id='databricks_default',
timeout_seconds=180,
retry_limit=3,
retry_delay=1.0):
"""
:param databricks_conn_id: The name of the databricks connection to use.
:type databricks_conn_id: str
:param timeout_seconds: The amount of time in seconds the requests library
will wait before timing-out.
:type timeout_seconds: int
:param retry_limit: The number of times to retry the connection in case of
service outages.
:type retry_limit: int
:param retry_delay: The number of seconds to wait between retries (it
might be a floating point number).
:type retry_delay: float
"""
def __init__(self, databricks_conn_id='databricks_default', timeout_seconds=180, retry_limit=3,
retry_delay=1.0):
self.databricks_conn_id = databricks_conn_id
self.databricks_conn = self.get_connection(databricks_conn_id)
self.timeout_seconds = timeout_seconds
Expand Down Expand Up @@ -189,12 +219,24 @@ def submit_run(self, json):
response = self._do_api_call(SUBMIT_RUN_ENDPOINT, json)
return response['run_id']

def get_run_page_url(self, run_id):
def get_run_page_url(self, run_id: str) -> str:
"""
Retrieves run_page_url.
:param run_id: id of the run
:return: URL of the run page
"""
json = {'run_id': run_id}
response = self._do_api_call(GET_RUN_ENDPOINT, json)
return response['run_page_url']

def get_run_state(self, run_id):
def get_run_state(self, run_id: str) -> RunState:
"""
Retrieves run state of the run.
:param run_id: id of the run
:return: state of the run
"""
json = {'run_id': run_id}
response = self._do_api_call(GET_RUN_ENDPOINT, json)
state = response['state']
Expand All @@ -204,23 +246,42 @@ def get_run_state(self, run_id):
state_message = state['state_message']
return RunState(life_cycle_state, result_state, state_message)

def cancel_run(self, run_id):
def cancel_run(self, run_id: str) -> None:
"""
Cancels the run.
:param run_id: id of the run
"""
json = {'run_id': run_id}
self._do_api_call(CANCEL_RUN_ENDPOINT, json)

def restart_cluster(self, json):
def restart_cluster(self, json: dict) -> None:
"""
Restarts the cluster.
:param json: json dictionary containing cluster specification.
"""
self._do_api_call(RESTART_CLUSTER_ENDPOINT, json)

def start_cluster(self, json):
def start_cluster(self, json: dict) -> None:
"""
Starts the cluster.
:param json: json dictionary containing cluster specification.
"""
self._do_api_call(START_CLUSTER_ENDPOINT, json)

def terminate_cluster(self, json):
def terminate_cluster(self, json: dict) -> None:
"""
Terminates the cluster.
:param json: json dictionary containing cluster specification.
"""
self._do_api_call(TERMINATE_CLUSTER_ENDPOINT, json)


def _retryable_error(exception):
return isinstance(exception, requests_exceptions.ConnectionError) \
or isinstance(exception, requests_exceptions.Timeout) \
return isinstance(exception, (requests_exceptions.ConnectionError, requests_exceptions.Timeout)) \
or exception.response is not None and exception.response.status_code >= 500


Expand All @@ -234,38 +295,6 @@ def _retryable_error(exception):
]


class RunState:
"""
Utility class for the run state concept of Databricks runs.
"""
def __init__(self, life_cycle_state, result_state, state_message):
self.life_cycle_state = life_cycle_state
self.result_state = result_state
self.state_message = state_message

@property
def is_terminal(self):
if self.life_cycle_state not in RUN_LIFE_CYCLE_STATES:
raise AirflowException(
('Unexpected life cycle state: {}: If the state has '
'been introduced recently, please check the Databricks user '
'guide for troubleshooting information').format(
self.life_cycle_state))
return self.life_cycle_state in ('TERMINATED', 'SKIPPED', 'INTERNAL_ERROR')

@property
def is_successful(self):
return self.result_state == 'SUCCESS'

def __eq__(self, other):
return self.life_cycle_state == other.life_cycle_state and \
self.result_state == other.result_state and \
self.state_message == other.state_message

def __repr__(self):
return str(self.__dict__)


class _TokenAuth(AuthBase):
"""
Helper class for requests Auth field. AuthBase requires you to implement the __call__
Expand Down
63 changes: 32 additions & 31 deletions airflow/contrib/hooks/jira_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Hook for JIRA"""
from jira import JIRA
from jira.exceptions import JIRAError

Expand Down Expand Up @@ -45,41 +46,41 @@ def get_conn(self):
get_server_info = True
validate = True
extra_options = {}
conn = None
if not self.jira_conn_id:
raise AirflowException('Failed to create jira client. no jira_conn_id provided')

if self.jira_conn_id is not None:
conn = self.get_connection(self.jira_conn_id)
if conn.extra is not None:
extra_options = conn.extra_dejson
# only required attributes are taken for now,
# more can be added ex: async, logging, max_retries
conn = self.get_connection(self.jira_conn_id)
if conn.extra is not None:
extra_options = conn.extra_dejson
# only required attributes are taken for now,
# more can be added ex: async, logging, max_retries

# verify
if 'verify' in extra_options \
and extra_options['verify'].lower() == 'false':
extra_options['verify'] = False
# verify
if 'verify' in extra_options \
and extra_options['verify'].lower() == 'false':
extra_options['verify'] = False

# validate
if 'validate' in extra_options \
and extra_options['validate'].lower() == 'false':
validate = False
# validate
if 'validate' in extra_options \
and extra_options['validate'].lower() == 'false':
validate = False

if 'get_server_info' in extra_options \
and extra_options['get_server_info'].lower() == 'false':
get_server_info = False
if 'get_server_info' in extra_options \
and extra_options['get_server_info'].lower() == 'false':
get_server_info = False

try:
self.client = JIRA(conn.host,
options=extra_options,
basic_auth=(conn.login, conn.password),
get_server_info=get_server_info,
validate=validate,
proxies=self.proxies)
except JIRAError as jira_error:
raise AirflowException('Failed to create jira client, jira error: %s'
% str(jira_error))
except Exception as e:
raise AirflowException('Failed to create jira client, error: %s'
% str(e))
try:
self.client = JIRA(conn.host,
options=extra_options,
basic_auth=(conn.login, conn.password),
get_server_info=get_server_info,
validate=validate,
proxies=self.proxies)
except JIRAError as jira_error:
raise AirflowException('Failed to create jira client, jira error: %s'
% str(jira_error))
except Exception as e:
raise AirflowException('Failed to create jira client, error: %s'
% str(e))

return self.client
6 changes: 4 additions & 2 deletions airflow/contrib/hooks/mongo_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Hook for Mongo DB"""
from ssl import CERT_NONE
from pymongo import MongoClient, ReplaceOne

from airflow.hooks.base_hook import BaseHook
from pymongo import MongoClient, ReplaceOne


class MongoHook(BaseHook):
Expand Down Expand Up @@ -83,7 +84,8 @@ def get_conn(self):

return self.client

def close_conn(self):
def close_conn(self) -> None:
"""Closes connection"""
client = self.client
if client is not None:
client.close()
Expand Down
1 change: 1 addition & 0 deletions airflow/contrib/hooks/redis_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from airflow.hooks.base_hook import BaseHook


# noinspection PyAbstractClass
class RedisHook(BaseHook):
"""
Wrapper for connection to interact with Redis in-memory data structure store
Expand Down
28 changes: 13 additions & 15 deletions airflow/contrib/hooks/salesforce_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,24 @@


class SalesforceHook(BaseHook):
"""
Create new connection to Salesforce and allows you to pull data out of SFDC and save it to a file.
def __init__(self, conn_id):
"""
Create new connection to Salesforce and allows you to pull data out of SFDC and save it to a file.
You can then use that file with other Airflow operators to move the data into another data source.
You can then use that file with other Airflow operators to move the data into another data source.
:param conn_id: the name of the connection that has the parameters we need to connect to Salesforce.
The connection should be type `http` and include a user's security token in the `Extras` field.
:type conn_id: str
:param conn_id: the name of the connection that has the parameters we need to connect to Salesforce.
The connection should be type `http` and include a user's security token in the `Extras` field.
:type conn_id: str
.. note::
For the HTTP connection type, you can include a
JSON structure in the `Extras` field.
We need a user's security token to connect to Salesforce.
So we define it in the `Extras` field as `{"security_token":"YOUR_SECURITY_TOKEN"}`
.. note::
For the HTTP connection type, you can include a
JSON structure in the `Extras` field.
We need a user's security token to connect to Salesforce.
So we define it in the `Extras` field as:
`{"security_token":"YOUR_SECURITY_TOKEN"}`
"""

"""
super().__init__(conn_id)
def __init__(self, conn_id):
self.conn_id = conn_id
self.conn = None

Expand Down
Loading

0 comments on commit 6ad628c

Please sign in to comment.