diff --git a/providers/src/airflow/providers/openlineage/plugins/adapter.py b/providers/src/airflow/providers/openlineage/plugins/adapter.py index e64109911926c..450cf38f0024a 100644 --- a/providers/src/airflow/providers/openlineage/plugins/adapter.py +++ b/providers/src/airflow/providers/openlineage/plugins/adapter.py @@ -377,6 +377,7 @@ def dag_success( clear_number: int, dag_run_state: DagRunState, task_ids: list[str], + run_facets: dict[str, RunFacet], ): try: event = RunEvent( @@ -390,6 +391,7 @@ def dag_success( facets={ **get_airflow_state_run_facet(dag_id, run_id, task_ids, dag_run_state), **get_airflow_debug_facet(), + **run_facets, }, ), inputs=[], @@ -413,6 +415,7 @@ def dag_failed( dag_run_state: DagRunState, task_ids: list[str], msg: str, + run_facets: dict[str, RunFacet], ): try: event = RunEvent( @@ -431,6 +434,7 @@ def dag_failed( ), **get_airflow_state_run_facet(dag_id, run_id, task_ids, dag_run_state), **get_airflow_debug_facet(), + **run_facets, }, ), inputs=[], diff --git a/providers/src/airflow/providers/openlineage/plugins/listener.py b/providers/src/airflow/providers/openlineage/plugins/listener.py index aefd534f155e1..c1da206c9872f 100644 --- a/providers/src/airflow/providers/openlineage/plugins/listener.py +++ b/providers/src/airflow/providers/openlineage/plugins/listener.py @@ -500,6 +500,7 @@ def on_dag_run_success(self, dag_run: DagRun, msg: str) -> None: task_ids = DagRun._get_partial_task_ids(dag_run.dag) else: task_ids = dag_run.dag.task_ids if dag_run.dag and dag_run.dag.partial else None + self.submit_callable( self.adapter.dag_success, dag_id=dag_run.dag_id, @@ -509,6 +510,7 @@ def on_dag_run_success(self, dag_run: DagRun, msg: str) -> None: clear_number=dag_run.clear_number, task_ids=task_ids, dag_run_state=dag_run.get_state(), + run_facets={**get_airflow_dag_run_facet(dag_run)}, ) except BaseException as e: self.log.warning("OpenLineage received exception in method on_dag_run_success", exc_info=e) @@ -543,6 +545,7 @@ def on_dag_run_failed(self, dag_run: DagRun, msg: str) -> None: dag_run_state=dag_run.get_state(), task_ids=task_ids, msg=msg, + run_facets={**get_airflow_dag_run_facet(dag_run)}, ) except BaseException as e: self.log.warning("OpenLineage received exception in method on_dag_run_failed", exc_info=e) diff --git a/providers/tests/openlineage/plugins/test_adapter.py b/providers/tests/openlineage/plugins/test_adapter.py index a7f8008532334..fd7f01ff61e49 100644 --- a/providers/tests/openlineage/plugins/test_adapter.py +++ b/providers/tests/openlineage/plugins/test_adapter.py @@ -713,6 +713,7 @@ def test_emit_dag_complete_event( clear_number=0, dag_run_state=DagRunState.SUCCESS, task_ids=["task_0", "task_1", "task_2.test"], + run_facets={"airflowDagRun": AirflowDagRunFacet(dag={"description": "dag desc"}, dagRun=dag_run)}, ) client.emit.assert_called_once_with( @@ -731,6 +732,7 @@ def test_emit_dag_complete_event( }, ), "debug": AirflowDebugRunFacet(packages=ANY), + "airflowDagRun": AirflowDagRunFacet(dag={"description": "dag desc"}, dagRun=dag_run), }, ), job=Job( @@ -804,6 +806,7 @@ def test_emit_dag_failed_event( dag_run_state=DagRunState.FAILED, task_ids=["task_0", "task_1", "task_2.test"], msg="error msg", + run_facets={"airflowDagRun": AirflowDagRunFacet(dag={"description": "dag desc"}, dagRun=dag_run)}, ) client.emit.assert_called_once_with( @@ -825,6 +828,7 @@ def test_emit_dag_failed_event( }, ), "debug": AirflowDebugRunFacet(packages=ANY), + "airflowDagRun": AirflowDagRunFacet(dag={"description": "dag desc"}, dagRun=dag_run), }, ), job=Job(