diff --git a/docs/apache-airflow-providers-openlineage/index.rst b/docs/apache-airflow-providers-openlineage/index.rst
index e0c1db9a757a6..2b1ccda0a7a28 100644
--- a/docs/apache-airflow-providers-openlineage/index.rst
+++ b/docs/apache-airflow-providers-openlineage/index.rst
@@ -56,6 +56,14 @@
PyPI Repository
Installing from sources
+.. toctree::
+ :hidden:
+ :maxdepth: 1
+ :caption: System tests
+
+ System Tests <_api/tests/system/openlineage/index>
+
+
.. THE REMAINDER OF THE FILE IS AUTOMATICALLY GENERATED. IT WILL BE OVERWRITTEN AT RELEASE TIME!
diff --git a/providers/src/airflow/providers/openlineage/plugins/adapter.py b/providers/src/airflow/providers/openlineage/plugins/adapter.py
index 535618529c135..ec836b541a2a3 100644
--- a/providers/src/airflow/providers/openlineage/plugins/adapter.py
+++ b/providers/src/airflow/providers/openlineage/plugins/adapter.py
@@ -157,10 +157,10 @@ def emit(self, event: RunEvent):
stack.enter_context(Stats.timer("ol.emit.attempts"))
self._client.emit(redacted_event)
self.log.debug("Successfully emitted OpenLineage event of id %s", event.run.runId)
- except Exception as e:
+ except Exception:
Stats.incr("ol.emit.failed")
self.log.warning("Failed to emit OpenLineage event of id %s", event.run.runId)
- self.log.debug("OpenLineage emission failure: %s", e)
+ self.log.debug("OpenLineage emission failure: %s", exc_info=True)
return redacted_event
diff --git a/providers/tests/system/openlineage/__init__.py b/providers/tests/system/openlineage/__init__.py
new file mode 100644
index 0000000000000..13a83393a9124
--- /dev/null
+++ b/providers/tests/system/openlineage/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/providers/tests/system/openlineage/conftest.py b/providers/tests/system/openlineage/conftest.py
new file mode 100644
index 0000000000000..48d568b307ef8
--- /dev/null
+++ b/providers/tests/system/openlineage/conftest.py
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+
+from airflow.listeners.listener import get_listener_manager
+from airflow.providers.openlineage.plugins.listener import OpenLineageListener
+
+from providers.tests.system.openlineage.transport.variable import VariableTransport
+
+
+@pytest.fixture(autouse=True)
+def set_transport_variable():
+ lm = get_listener_manager()
+ lm.clear()
+ listener = OpenLineageListener()
+ listener.adapter._client = listener.adapter.get_or_create_openlineage_client()
+ listener.adapter._client.transport = VariableTransport()
+ lm.add_listener(listener)
+ yield
+ lm.clear()
diff --git a/providers/tests/system/openlineage/example_openlineage.json b/providers/tests/system/openlineage/example_openlineage.json
new file mode 100644
index 0000000000000..0db8bc53e2288
--- /dev/null
+++ b/providers/tests/system/openlineage/example_openlineage.json
@@ -0,0 +1,38 @@
+[
+ {
+ "eventType": "START",
+ "eventTime": "{{ is_datetime(result) }}",
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ },
+ "job": {
+ "namespace": "default",
+ "name": "openlineage_basic_dag.do_nothing_task",
+ "facets": {
+ "jobType": {
+ "integration": "AIRFLOW",
+ "jobType": "TASK",
+ "processingType": "BATCH"
+ }
+ }
+ }
+ },
+ {
+ "eventType": "COMPLETE",
+ "eventTime": "{{ is_datetime(result) }}",
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ },
+ "job": {
+ "namespace": "default",
+ "name": "openlineage_basic_dag.do_nothing_task",
+ "facets": {
+ "jobType": {
+ "integration": "AIRFLOW",
+ "jobType": "TASK",
+ "processingType": "BATCH"
+ }
+ }
+ }
+ }
+]
diff --git a/providers/tests/system/openlineage/example_openlineage.py b/providers/tests/system/openlineage/example_openlineage.py
new file mode 100644
index 0000000000000..d76d303ef45c7
--- /dev/null
+++ b/providers/tests/system/openlineage/example_openlineage.py
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import os
+from datetime import datetime
+
+from airflow import DAG
+from airflow.providers.standard.operators.python import PythonOperator
+
+from providers.tests.system.openlineage.operator import OpenLineageTestOperator
+
+
+def do_nothing():
+ pass
+
+
+default_args = {"start_date": datetime(2021, 1, 1), "retries": 1}
+
+# Instantiate the DAG
+with DAG(
+ "openlineage_basic_dag",
+ default_args=default_args,
+ start_date=datetime(2021, 1, 1),
+ schedule=None,
+ catchup=False,
+) as dag:
+ nothing_task = PythonOperator(task_id="do_nothing_task", python_callable=do_nothing)
+
+ check_events = OpenLineageTestOperator(
+ task_id="check_events",
+ file_path=f"{os.getenv('AIRFLOW_HOME')}/dags/providers/tests/system/openlineage/example_openlineage.json",
+ )
+
+ nothing_task >> check_events
+
+
+from tests_common.test_utils.system_tests import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/providers/tests/system/openlineage/example_openlineage_mapped_sensor.json b/providers/tests/system/openlineage/example_openlineage_mapped_sensor.json
new file mode 100644
index 0000000000000..c112fb35448ad
--- /dev/null
+++ b/providers/tests/system/openlineage/example_openlineage_mapped_sensor.json
@@ -0,0 +1,75 @@
+[
+ {
+ "eventType": "START",
+ "eventTime": "{{ is_datetime(result) }}",
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ },
+ "job": {
+ "namespace": "default",
+ "name": "openlineage_sensor_mapped_tasks_dag.wait_for_10_seconds",
+ "facets": {
+ "jobType": {
+ "integration": "AIRFLOW",
+ "jobType": "TASK",
+ "processingType": "BATCH"
+ }
+ }
+ }
+ },
+ {
+ "eventType": "COMPLETE",
+ "eventTime": "{{ is_datetime(result) }}",
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ },
+ "job": {
+ "namespace": "default",
+ "name": "openlineage_sensor_mapped_tasks_dag.wait_for_10_seconds",
+ "facets": {
+ "jobType": {
+ "integration": "AIRFLOW",
+ "jobType": "TASK",
+ "processingType": "BATCH"
+ }
+ }
+ }
+ },
+ {
+ "eventType": "START",
+ "eventTime": "{{ is_datetime(result) }}",
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ },
+ "job": {
+ "namespace": "default",
+ "name": "openlineage_sensor_mapped_tasks_dag.wait_for_10_seconds",
+ "facets": {
+ "jobType": {
+ "integration": "AIRFLOW",
+ "jobType": "TASK",
+ "processingType": "BATCH"
+ }
+ }
+ }
+ },
+
+ {
+ "eventType": "COMPLETE",
+ "eventTime": "{{ is_datetime(result) }}",
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ },
+ "job": {
+ "namespace": "default",
+ "name": "openlineage_sensor_mapped_tasks_dag.wait_for_10_seconds",
+ "facets": {
+ "jobType": {
+ "integration": "AIRFLOW",
+ "jobType": "TASK",
+ "processingType": "BATCH"
+ }
+ }
+ }
+ }
+]
diff --git a/providers/tests/system/openlineage/example_openlineage_mapped_sensor.py b/providers/tests/system/openlineage/example_openlineage_mapped_sensor.py
new file mode 100644
index 0000000000000..98d9b02884ed9
--- /dev/null
+++ b/providers/tests/system/openlineage/example_openlineage_mapped_sensor.py
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import os
+from datetime import datetime, timedelta
+
+from airflow import DAG
+from airflow.models import Variable
+from airflow.providers.standard.operators.python import PythonOperator
+from airflow.providers.standard.sensors.time_delta import TimeDeltaSensor
+
+from providers.tests.system.openlineage.operator import OpenLineageTestOperator
+
+
+def my_task(task_number):
+ print(os.getcwd())
+ print(f"Executing task number: {task_number}")
+
+
+def check_start_amount_func():
+ start_sensor_key = "openlineage_sensor_mapped_tasks_dag.wait_for_10_seconds.event.start" # type: ignore[union-attr]
+ events = Variable.get(start_sensor_key, deserialize_json=True)
+ if len(events) < 2:
+ raise ValueError(f"Expected at least 2 events, got {len(events)}")
+
+
+with DAG(
+ dag_id="openlineage_sensor_mapped_tasks_dag",
+ start_date=datetime(2021, 1, 1),
+ schedule=None,
+ catchup=False,
+) as dag:
+ wait_for_10_seconds = TimeDeltaSensor(
+ task_id="wait_for_10_seconds",
+ mode="reschedule",
+ poke_interval=5,
+ delta=timedelta(seconds=10),
+ )
+
+ mapped_tasks = [
+ PythonOperator(
+ task_id=f"mapped_task_{i}",
+ python_callable=my_task,
+ op_args=[i],
+ )
+ for i in range(2)
+ ]
+
+ check_start_amount = PythonOperator(
+ task_id="check_order",
+ python_callable=check_start_amount_func,
+ )
+
+ check_events = OpenLineageTestOperator(
+ task_id="check_events",
+ file_path=f"{os.getenv('AIRFLOW_HOME')}/dags/providers/tests/system/openlineage/example_openlineage_mapped_sensor.json",
+ allow_duplicate_events=True,
+ )
+
+ wait_for_10_seconds >> mapped_tasks >> check_start_amount >> check_events
+
+
+from tests_common.test_utils.system_tests import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/providers/tests/system/openlineage/operator.py b/providers/tests/system/openlineage/operator.py
new file mode 100644
index 0000000000000..a305ca7026a7f
--- /dev/null
+++ b/providers/tests/system/openlineage/operator.py
@@ -0,0 +1,214 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import json
+import logging
+import os
+import uuid
+from typing import TYPE_CHECKING, Any
+from urllib.parse import urlparse
+
+from dateutil.parser import parse
+from jinja2 import Environment
+
+from airflow.models.operator import BaseOperator
+from airflow.models.variable import Variable
+from airflow.utils.session import create_session
+
+if TYPE_CHECKING:
+ from airflow.utils.context import Context
+
+log = logging.getLogger(__name__)
+
+
+def any(result: Any) -> Any:
+ return result
+
+
+def is_datetime(result: Any) -> str:
+ try:
+ parse(result)
+ return "true"
+ except Exception:
+ pass
+ return "false"
+
+
+def is_uuid(result: Any) -> str:
+ try:
+ uuid.UUID(result)
+ return "true"
+ except Exception:
+ pass
+ return "false"
+
+
+def env_var(var: str, default: str | None = None) -> str:
+ """
+ Use this jinja method to access the environment variable named 'var'.
+
+ If there is no such environment variable set, return the default.
+ If the default is None, raise an exception for an undefined variable.
+ """
+ if var in os.environ:
+ return os.environ[var]
+ elif default is not None:
+ return default
+ raise ValueError(f"Env var required but not provided: '{var}'")
+
+
+def not_match(result: str, pattern: str) -> str:
+ if pattern in result:
+ raise ValueError(f"Found {pattern} in {result}")
+ return "true"
+
+
+def url_scheme_authority(url: str) -> str:
+ parsed = urlparse(url)
+ return f"{parsed.scheme}://{parsed.netloc}"
+
+
+def url_path(url: str) -> str:
+ return urlparse(url).path
+
+
+def setup_jinja() -> Environment:
+ env = Environment()
+ env.globals["any"] = any
+ env.globals["is_datetime"] = is_datetime
+ env.globals["is_uuid"] = is_uuid
+ env.globals["env_var"] = env_var
+ env.globals["not_match"] = not_match
+ env.filters["url_scheme_authority"] = url_scheme_authority
+ env.filters["url_path"] = url_path
+ return env
+
+
+def match(expected, result, env: Environment) -> bool:
+ """
+ Check if result is "equal" to expected value.
+
+ Omits keys not specified in expected value and resolves any jinja templates found.
+ """
+ if isinstance(expected, dict):
+ # Take a look only at keys present at expected dictionary
+ if not isinstance(result, dict):
+ log.error("Not a dict: %s\nExpected %s", result, expected)
+ return False
+ for k, v in expected.items():
+ if k not in result:
+ log.error("Key %s not in received event %s\nExpected %s", k, result, expected)
+ return False
+ if not match(v, result[k], env):
+ log.error(
+ "For key %s, expected value %s not equals received %s\nExpected: %s, request: %s",
+ k,
+ v,
+ result[k],
+ expected,
+ result,
+ )
+ return False
+ elif isinstance(expected, list):
+ if len(expected) != len(result):
+ log.error("Length does not match: expected %d, result: %d", len(expected), len(result))
+ return False
+ for i, x in enumerate(expected):
+ if not match(x, result[i], env):
+ log.error(
+ "List not matched at %d\nexpected:\n%s\nresult: \n%s",
+ i,
+ json.dumps(x),
+ json.dumps(result[i]),
+ )
+ return False
+ elif isinstance(expected, str):
+ if "{{" in expected:
+ # Evaluate jinja: in some cases, we want to check only if key exists, or if
+ # value has the right type
+ try:
+ rendered = env.from_string(expected).render(result=result)
+ except ValueError as e:
+ log.error("Error rendering jinja template %s: %s", expected, e)
+ return False
+ if rendered == "true" or rendered == result:
+ return True
+ log.error("Rendered value %s does not equal 'true' or %s", rendered, result)
+ return False
+ elif expected != result:
+ log.error("Expected value %s does not equal result %s", expected, result)
+ return False
+ elif expected != result:
+ log.error("Object of type %s: %s does not match %s", type(expected), expected, result)
+ return False
+ return True
+
+
+class OpenLineageTestOperator(BaseOperator):
+ """
+ This operator is added for system testing purposes.
+
+ It compares expected event templates set on initialization with ones emitted by OpenLineage integration
+ and stored in Variables by VariableTransport.
+ :param event_templates: dictionary where key is the key used by VariableTransport in format of ..event., and value is event template (fragment) that need to be in received events.
+ :param file_path: alternatively, file_path pointing to file with event templates will be used
+ :param env: jinja environment used to render event templates
+ :param allow_duplicate_events: if set to True, allows multiple events for the same key
+ :param clear_variables: if set to True, clears all variables after checking events
+ :raises: ValueError if the received events do not match with expected ones.
+ """
+
+ def __init__(
+ self,
+ event_templates: dict[str, dict] | None = None,
+ file_path: str | None = None,
+ env: Environment = setup_jinja(),
+ allow_duplicate_events: bool = False,
+ clear_variables: bool = True,
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+ self.event_templates = event_templates
+ self.file_path = file_path
+ self.env = env
+ self.multiple_events = allow_duplicate_events
+ self.delete = clear_variables
+ if self.event_templates and self.file_path:
+ raise ValueError("Can't pass both event_templates and file_path")
+
+ def execute(self, context: Context) -> None:
+ if self.file_path is not None:
+ self.event_templates = {}
+ with open(self.file_path) as f: # type: ignore[arg-type]
+ events = json.load(f)
+ for event in events:
+ key = event["job"]["name"] + ".event." + event["eventType"].lower()
+ self.event_templates[key] = event
+ for key, template in self.event_templates.items(): # type: ignore[union-attr]
+ send_event = Variable.get(key=key, deserialize_json=True)
+ if len(send_event) == 0:
+ raise ValueError(f"No event for key {key}")
+ if len(send_event) != 1 and not self.multiple_events:
+ raise ValueError(f"Expected one event for key {key}, got {len(send_event)}")
+ self.log.info("Events: %s, %s, %s", send_event, len(send_event), type(send_event))
+ if not match(template, json.loads(send_event[0]), self.env):
+ raise ValueError("Event received does not match one specified in test")
+ if self.delete:
+ with create_session() as session:
+ session.query(Variable).delete()
diff --git a/providers/tests/system/openlineage/transport/__init__.py b/providers/tests/system/openlineage/transport/__init__.py
new file mode 100644
index 0000000000000..13a83393a9124
--- /dev/null
+++ b/providers/tests/system/openlineage/transport/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/providers/tests/system/openlineage/transport/variable.py b/providers/tests/system/openlineage/transport/variable.py
new file mode 100644
index 0000000000000..beeeac5aff6d0
--- /dev/null
+++ b/providers/tests/system/openlineage/transport/variable.py
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from openlineage.client.serde import Serde
+from openlineage.client.transport import Transport, get_default_factory
+
+from airflow.models.variable import Variable
+
+if TYPE_CHECKING:
+ from openlineage.client.client import Event
+
+
+class VariableTransport(Transport):
+ """
+ This transport sends OpenLineage events to Variables.
+
+ Key schema is ..event..
+ It's made to be used in system tests, stored data read by OpenLineageTestOperator.
+ """
+
+ kind = "variable"
+
+ def emit(self, event: Event) -> None:
+ key = f"{event.job.name}.event.{event.eventType.value.lower()}" # type: ignore[union-attr]
+ event_str = Serde.to_json(event)
+ if (var := Variable.get(key=key, default_var=None, deserialize_json=True)) is not None:
+ Variable.set(key=key, value=var + [event_str], serialize_json=True)
+ else:
+ Variable.set(key=key, value=[event_str], serialize_json=True)
+
+
+get_default_factory().register_transport(VariableTransport.kind, VariableTransport)