diff --git a/astronomer/providers/amazon/aws/example_dags/example_emr_sensor.py b/astronomer/providers/amazon/aws/example_dags/example_emr_sensor.py index b5cee45d5..6baf3fb1a 100644 --- a/astronomer/providers/amazon/aws/example_dags/example_emr_sensor.py +++ b/astronomer/providers/amazon/aws/example_dags/example_emr_sensor.py @@ -109,7 +109,7 @@ def check_dag_status(**kwargs: Any) -> None: # [START howto_sensor_emr_step_async] watch_step = EmrStepSensorAsync( task_id="watch_step", - job_flow_id=create_job_flow.output, # type: ignore[arg-type] + job_flow_id=create_job_flow.output, step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}", aws_conn_id=AWS_CONN_ID, ) diff --git a/astronomer/providers/amazon/aws/sensors/emr.py b/astronomer/providers/amazon/aws/sensors/emr.py index ff2e21934..6314839d6 100644 --- a/astronomer/providers/amazon/aws/sensors/emr.py +++ b/astronomer/providers/amazon/aws/sensors/emr.py @@ -1,5 +1,6 @@ from __future__ import annotations +import warnings from datetime import timedelta from typing import Any @@ -12,105 +13,48 @@ ) from astronomer.providers.amazon.aws.triggers.emr import ( - EmrContainerSensorTrigger, EmrJobFlowSensorTrigger, - EmrStepSensorTrigger, ) -from astronomer.providers.utils.sensor_util import poke, raise_error_or_skip_exception +from astronomer.providers.utils.sensor_util import raise_error_or_skip_exception from astronomer.providers.utils.typing_compat import Context class EmrContainerSensorAsync(EmrContainerSensor): """ - EmrContainerSensorAsync is async version of EmrContainerSensor, - Asks for the state of the job run until it reaches a failure state or success state. - If the job run fails, the task will fail. - - :param virtual_cluster_id: Reference Emr cluster id - :param job_id: job_id to check the state - :param max_retries: Number of times to poll for query state before - returning the current state, defaults to None - :param aws_conn_id: aws connection to use, defaults to ``aws_default`` - :param poll_interval: Time in seconds to wait between two consecutive call to - check query status on athena, defaults to 10 + This class is deprecated. + Please use :class: `~airflow.providers.amazon.aws.sensors.emr.EmrContainerSensor`. """ - def execute(self, context: Context) -> None: - """Defers trigger class to poll for state of the job run until it reaches a failure state or success state""" - if not poke(self, context): - self.defer( - timeout=timedelta(seconds=self.timeout), - trigger=EmrContainerSensorTrigger( - virtual_cluster_id=self.virtual_cluster_id, - job_id=self.job_id, - max_tries=self.max_retries, - aws_conn_id=self.aws_conn_id, - poll_interval=self.poll_interval, - ), - method_name="execute_complete", - ) - - # Ignoring the override type check because the parent class specifies "context: Any" but specifying it as - # "context: Context" is accurate as it's more specific. - def execute_complete(self, context: Context, event: dict[str, str]) -> None: # type: ignore[override] - """ - Callback for when the trigger fires - returns immediately. - Relies on trigger to throw an exception, otherwise it assumes execution was - successful. - """ - if event: - if event["status"] == "error": - raise_error_or_skip_exception(self.soft_fail, event["message"]) - self.log.info(event["message"]) - return None + def __init__(self, *args, **kwargs) -> None: + warnings.warn( + ( + "This module is deprecated. " + "Please use `airflow.providers.amazon.aws.sensors.emr.EmrContainerSensor` " + "and set deferrable to True instead." + ), + DeprecationWarning, + stacklevel=2, + ) + return super().__init__(*args, deferrable=True, **kwargs) class EmrStepSensorAsync(EmrStepSensor): """ - Async (deferring) version of EmrStepSensor - - Asks for the state of the step until it reaches any of the target states. - If the sensor errors out, then the task will fail - With the default target states, sensor waits step to be COMPLETED. - - For more details see - - https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr.html#EMR.Client.describe_step - - :param job_flow_id: job_flow_id which contains the step check the state of - :param step_id: step to check the state of - :param target_states: the target states, sensor waits until - step reaches any of these states - :param failed_states: the failure states, sensor fails when - step reaches any of these states + This class is deprecated. + Please use :class: `~airflow.providers.amazon.aws.sensors.emr.EmrStepSensor`. """ - def execute(self, context: Context) -> None: - """Deferred and give control to trigger""" - if not poke(self, context): - self.defer( - timeout=timedelta(seconds=self.timeout), - trigger=EmrStepSensorTrigger( - job_flow_id=self.job_flow_id, - step_id=self.step_id, - target_states=self.target_states, - failed_states=self.failed_states, - aws_conn_id=self.aws_conn_id, - poke_interval=self.poke_interval, - ), - method_name="execute_complete", - ) - - def execute_complete(self, context: Context, event: dict[str, Any]) -> None: # type: ignore[override] - """ - Callback for when the trigger fires - returns immediately. - Relies on trigger to throw an exception, otherwise it assumes execution was - successful. - """ - if event: - if event["status"] == "error": - raise_error_or_skip_exception(self.soft_fail, event["message"]) - self.log.info(event.get("message")) - self.log.info("%s completed successfully.", self.job_flow_id) + def __init__(self, *args, **kwargs) -> None: + warnings.warn( + ( + "This module is deprecated. " + "Please use `airflow.providers.amazon.aws.sensors.emr.EmrStepSensor` " + "and set deferrable to True instead." + ), + DeprecationWarning, + stacklevel=2, + ) + return super().__init__(*args, deferrable=True, **kwargs) class EmrJobFlowSensorAsync(EmrJobFlowSensor):