Skip to content

Commit

Permalink
ShortCircuitOperator push XCom by returnung python_callable result (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
kazanzhy authored Dec 11, 2021
1 parent 993ed93 commit 4f96450
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 1 deletion.
2 changes: 1 addition & 1 deletion airflow/operators/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ def execute(self, context: Dict):

if condition:
self.log.info('Proceeding with downstream tasks...')
return
return condition

self.log.info('Skipping downstream tasks...')

Expand Down
19 changes: 19 additions & 0 deletions tests/operators/test_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,25 @@ def test_clear_skipped_downstream_task(self):
else:
raise ValueError(f'Invalid task id {ti.task_id} found!')

def test_xcom_push(self):
dag = DAG(
'shortcircuit_operator_test_xcom_push',
default_args={'owner': 'airflow', 'start_date': DEFAULT_DATE},
schedule_interval=INTERVAL,
)
short_op = ShortCircuitOperator(task_id='make_choice', dag=dag, python_callable=lambda: 'signature')
dag.clear()
dr = dag.create_dagrun(
run_type=DagRunType.MANUAL,
start_date=timezone.utcnow(),
execution_date=DEFAULT_DATE,
state=State.RUNNING,
)
short_op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
tis = dr.get_task_instances()
xcom_value = tis[0].xcom_pull(task_ids='make_choice', key='return_value')
assert xcom_value == 'signature'


virtualenv_string_args: List[str] = []

Expand Down

0 comments on commit 4f96450

Please sign in to comment.