From 7cc36104ab9c68e2246795612fdd9713ad7aa977 Mon Sep 17 00:00:00 2001 From: Maciej Obuchowski Date: Wed, 27 Nov 2024 02:16:21 +0100 Subject: [PATCH] add basic system tests for OpenLineage (#43643) Signed-off-by: Maciej Obuchowski --- .../index.rst | 8 + .../providers/openlineage/plugins/adapter.py | 4 +- .../tests/system/openlineage/__init__.py | 16 ++ .../tests/system/openlineage/conftest.py | 36 +++ .../openlineage/example_openlineage.json | 38 ++++ .../system/openlineage/example_openlineage.py | 55 +++++ .../example_openlineage_mapped_sensor.json | 75 ++++++ .../example_openlineage_mapped_sensor.py | 81 +++++++ .../tests/system/openlineage/operator.py | 214 ++++++++++++++++++ .../system/openlineage/transport/__init__.py | 16 ++ .../system/openlineage/transport/variable.py | 49 ++++ 11 files changed, 590 insertions(+), 2 deletions(-) create mode 100644 providers/tests/system/openlineage/__init__.py create mode 100644 providers/tests/system/openlineage/conftest.py create mode 100644 providers/tests/system/openlineage/example_openlineage.json create mode 100644 providers/tests/system/openlineage/example_openlineage.py create mode 100644 providers/tests/system/openlineage/example_openlineage_mapped_sensor.json create mode 100644 providers/tests/system/openlineage/example_openlineage_mapped_sensor.py create mode 100644 providers/tests/system/openlineage/operator.py create mode 100644 providers/tests/system/openlineage/transport/__init__.py create mode 100644 providers/tests/system/openlineage/transport/variable.py diff --git a/docs/apache-airflow-providers-openlineage/index.rst b/docs/apache-airflow-providers-openlineage/index.rst index e0c1db9a757a6..2b1ccda0a7a28 100644 --- a/docs/apache-airflow-providers-openlineage/index.rst +++ b/docs/apache-airflow-providers-openlineage/index.rst @@ -56,6 +56,14 @@ PyPI Repository Installing from sources +.. toctree:: + :hidden: + :maxdepth: 1 + :caption: System tests + + System Tests <_api/tests/system/openlineage/index> + + .. THE REMAINDER OF THE FILE IS AUTOMATICALLY GENERATED. IT WILL BE OVERWRITTEN AT RELEASE TIME! diff --git a/providers/src/airflow/providers/openlineage/plugins/adapter.py b/providers/src/airflow/providers/openlineage/plugins/adapter.py index 535618529c135..ec836b541a2a3 100644 --- a/providers/src/airflow/providers/openlineage/plugins/adapter.py +++ b/providers/src/airflow/providers/openlineage/plugins/adapter.py @@ -157,10 +157,10 @@ def emit(self, event: RunEvent): stack.enter_context(Stats.timer("ol.emit.attempts")) self._client.emit(redacted_event) self.log.debug("Successfully emitted OpenLineage event of id %s", event.run.runId) - except Exception as e: + except Exception: Stats.incr("ol.emit.failed") self.log.warning("Failed to emit OpenLineage event of id %s", event.run.runId) - self.log.debug("OpenLineage emission failure: %s", e) + self.log.debug("OpenLineage emission failure: %s", exc_info=True) return redacted_event diff --git a/providers/tests/system/openlineage/__init__.py b/providers/tests/system/openlineage/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/tests/system/openlineage/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/tests/system/openlineage/conftest.py b/providers/tests/system/openlineage/conftest.py new file mode 100644 index 0000000000000..48d568b307ef8 --- /dev/null +++ b/providers/tests/system/openlineage/conftest.py @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import pytest + +from airflow.listeners.listener import get_listener_manager +from airflow.providers.openlineage.plugins.listener import OpenLineageListener + +from providers.tests.system.openlineage.transport.variable import VariableTransport + + +@pytest.fixture(autouse=True) +def set_transport_variable(): + lm = get_listener_manager() + lm.clear() + listener = OpenLineageListener() + listener.adapter._client = listener.adapter.get_or_create_openlineage_client() + listener.adapter._client.transport = VariableTransport() + lm.add_listener(listener) + yield + lm.clear() diff --git a/providers/tests/system/openlineage/example_openlineage.json b/providers/tests/system/openlineage/example_openlineage.json new file mode 100644 index 0000000000000..0db8bc53e2288 --- /dev/null +++ b/providers/tests/system/openlineage/example_openlineage.json @@ -0,0 +1,38 @@ +[ + { + "eventType": "START", + "eventTime": "{{ is_datetime(result) }}", + "run": { + "runId": "{{ is_uuid(result) }}" + }, + "job": { + "namespace": "default", + "name": "openlineage_basic_dag.do_nothing_task", + "facets": { + "jobType": { + "integration": "AIRFLOW", + "jobType": "TASK", + "processingType": "BATCH" + } + } + } + }, + { + "eventType": "COMPLETE", + "eventTime": "{{ is_datetime(result) }}", + "run": { + "runId": "{{ is_uuid(result) }}" + }, + "job": { + "namespace": "default", + "name": "openlineage_basic_dag.do_nothing_task", + "facets": { + "jobType": { + "integration": "AIRFLOW", + "jobType": "TASK", + "processingType": "BATCH" + } + } + } + } +] diff --git a/providers/tests/system/openlineage/example_openlineage.py b/providers/tests/system/openlineage/example_openlineage.py new file mode 100644 index 0000000000000..d76d303ef45c7 --- /dev/null +++ b/providers/tests/system/openlineage/example_openlineage.py @@ -0,0 +1,55 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import os +from datetime import datetime + +from airflow import DAG +from airflow.providers.standard.operators.python import PythonOperator + +from providers.tests.system.openlineage.operator import OpenLineageTestOperator + + +def do_nothing(): + pass + + +default_args = {"start_date": datetime(2021, 1, 1), "retries": 1} + +# Instantiate the DAG +with DAG( + "openlineage_basic_dag", + default_args=default_args, + start_date=datetime(2021, 1, 1), + schedule=None, + catchup=False, +) as dag: + nothing_task = PythonOperator(task_id="do_nothing_task", python_callable=do_nothing) + + check_events = OpenLineageTestOperator( + task_id="check_events", + file_path=f"{os.getenv('AIRFLOW_HOME')}/dags/providers/tests/system/openlineage/example_openlineage.json", + ) + + nothing_task >> check_events + + +from tests_common.test_utils.system_tests import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/providers/tests/system/openlineage/example_openlineage_mapped_sensor.json b/providers/tests/system/openlineage/example_openlineage_mapped_sensor.json new file mode 100644 index 0000000000000..c112fb35448ad --- /dev/null +++ b/providers/tests/system/openlineage/example_openlineage_mapped_sensor.json @@ -0,0 +1,75 @@ +[ + { + "eventType": "START", + "eventTime": "{{ is_datetime(result) }}", + "run": { + "runId": "{{ is_uuid(result) }}" + }, + "job": { + "namespace": "default", + "name": "openlineage_sensor_mapped_tasks_dag.wait_for_10_seconds", + "facets": { + "jobType": { + "integration": "AIRFLOW", + "jobType": "TASK", + "processingType": "BATCH" + } + } + } + }, + { + "eventType": "COMPLETE", + "eventTime": "{{ is_datetime(result) }}", + "run": { + "runId": "{{ is_uuid(result) }}" + }, + "job": { + "namespace": "default", + "name": "openlineage_sensor_mapped_tasks_dag.wait_for_10_seconds", + "facets": { + "jobType": { + "integration": "AIRFLOW", + "jobType": "TASK", + "processingType": "BATCH" + } + } + } + }, + { + "eventType": "START", + "eventTime": "{{ is_datetime(result) }}", + "run": { + "runId": "{{ is_uuid(result) }}" + }, + "job": { + "namespace": "default", + "name": "openlineage_sensor_mapped_tasks_dag.wait_for_10_seconds", + "facets": { + "jobType": { + "integration": "AIRFLOW", + "jobType": "TASK", + "processingType": "BATCH" + } + } + } + }, + + { + "eventType": "COMPLETE", + "eventTime": "{{ is_datetime(result) }}", + "run": { + "runId": "{{ is_uuid(result) }}" + }, + "job": { + "namespace": "default", + "name": "openlineage_sensor_mapped_tasks_dag.wait_for_10_seconds", + "facets": { + "jobType": { + "integration": "AIRFLOW", + "jobType": "TASK", + "processingType": "BATCH" + } + } + } + } +] diff --git a/providers/tests/system/openlineage/example_openlineage_mapped_sensor.py b/providers/tests/system/openlineage/example_openlineage_mapped_sensor.py new file mode 100644 index 0000000000000..98d9b02884ed9 --- /dev/null +++ b/providers/tests/system/openlineage/example_openlineage_mapped_sensor.py @@ -0,0 +1,81 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import os +from datetime import datetime, timedelta + +from airflow import DAG +from airflow.models import Variable +from airflow.providers.standard.operators.python import PythonOperator +from airflow.providers.standard.sensors.time_delta import TimeDeltaSensor + +from providers.tests.system.openlineage.operator import OpenLineageTestOperator + + +def my_task(task_number): + print(os.getcwd()) + print(f"Executing task number: {task_number}") + + +def check_start_amount_func(): + start_sensor_key = "openlineage_sensor_mapped_tasks_dag.wait_for_10_seconds.event.start" # type: ignore[union-attr] + events = Variable.get(start_sensor_key, deserialize_json=True) + if len(events) < 2: + raise ValueError(f"Expected at least 2 events, got {len(events)}") + + +with DAG( + dag_id="openlineage_sensor_mapped_tasks_dag", + start_date=datetime(2021, 1, 1), + schedule=None, + catchup=False, +) as dag: + wait_for_10_seconds = TimeDeltaSensor( + task_id="wait_for_10_seconds", + mode="reschedule", + poke_interval=5, + delta=timedelta(seconds=10), + ) + + mapped_tasks = [ + PythonOperator( + task_id=f"mapped_task_{i}", + python_callable=my_task, + op_args=[i], + ) + for i in range(2) + ] + + check_start_amount = PythonOperator( + task_id="check_order", + python_callable=check_start_amount_func, + ) + + check_events = OpenLineageTestOperator( + task_id="check_events", + file_path=f"{os.getenv('AIRFLOW_HOME')}/dags/providers/tests/system/openlineage/example_openlineage_mapped_sensor.json", + allow_duplicate_events=True, + ) + + wait_for_10_seconds >> mapped_tasks >> check_start_amount >> check_events + + +from tests_common.test_utils.system_tests import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/providers/tests/system/openlineage/operator.py b/providers/tests/system/openlineage/operator.py new file mode 100644 index 0000000000000..a305ca7026a7f --- /dev/null +++ b/providers/tests/system/openlineage/operator.py @@ -0,0 +1,214 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import json +import logging +import os +import uuid +from typing import TYPE_CHECKING, Any +from urllib.parse import urlparse + +from dateutil.parser import parse +from jinja2 import Environment + +from airflow.models.operator import BaseOperator +from airflow.models.variable import Variable +from airflow.utils.session import create_session + +if TYPE_CHECKING: + from airflow.utils.context import Context + +log = logging.getLogger(__name__) + + +def any(result: Any) -> Any: + return result + + +def is_datetime(result: Any) -> str: + try: + parse(result) + return "true" + except Exception: + pass + return "false" + + +def is_uuid(result: Any) -> str: + try: + uuid.UUID(result) + return "true" + except Exception: + pass + return "false" + + +def env_var(var: str, default: str | None = None) -> str: + """ + Use this jinja method to access the environment variable named 'var'. + + If there is no such environment variable set, return the default. + If the default is None, raise an exception for an undefined variable. + """ + if var in os.environ: + return os.environ[var] + elif default is not None: + return default + raise ValueError(f"Env var required but not provided: '{var}'") + + +def not_match(result: str, pattern: str) -> str: + if pattern in result: + raise ValueError(f"Found {pattern} in {result}") + return "true" + + +def url_scheme_authority(url: str) -> str: + parsed = urlparse(url) + return f"{parsed.scheme}://{parsed.netloc}" + + +def url_path(url: str) -> str: + return urlparse(url).path + + +def setup_jinja() -> Environment: + env = Environment() + env.globals["any"] = any + env.globals["is_datetime"] = is_datetime + env.globals["is_uuid"] = is_uuid + env.globals["env_var"] = env_var + env.globals["not_match"] = not_match + env.filters["url_scheme_authority"] = url_scheme_authority + env.filters["url_path"] = url_path + return env + + +def match(expected, result, env: Environment) -> bool: + """ + Check if result is "equal" to expected value. + + Omits keys not specified in expected value and resolves any jinja templates found. + """ + if isinstance(expected, dict): + # Take a look only at keys present at expected dictionary + if not isinstance(result, dict): + log.error("Not a dict: %s\nExpected %s", result, expected) + return False + for k, v in expected.items(): + if k not in result: + log.error("Key %s not in received event %s\nExpected %s", k, result, expected) + return False + if not match(v, result[k], env): + log.error( + "For key %s, expected value %s not equals received %s\nExpected: %s, request: %s", + k, + v, + result[k], + expected, + result, + ) + return False + elif isinstance(expected, list): + if len(expected) != len(result): + log.error("Length does not match: expected %d, result: %d", len(expected), len(result)) + return False + for i, x in enumerate(expected): + if not match(x, result[i], env): + log.error( + "List not matched at %d\nexpected:\n%s\nresult: \n%s", + i, + json.dumps(x), + json.dumps(result[i]), + ) + return False + elif isinstance(expected, str): + if "{{" in expected: + # Evaluate jinja: in some cases, we want to check only if key exists, or if + # value has the right type + try: + rendered = env.from_string(expected).render(result=result) + except ValueError as e: + log.error("Error rendering jinja template %s: %s", expected, e) + return False + if rendered == "true" or rendered == result: + return True + log.error("Rendered value %s does not equal 'true' or %s", rendered, result) + return False + elif expected != result: + log.error("Expected value %s does not equal result %s", expected, result) + return False + elif expected != result: + log.error("Object of type %s: %s does not match %s", type(expected), expected, result) + return False + return True + + +class OpenLineageTestOperator(BaseOperator): + """ + This operator is added for system testing purposes. + + It compares expected event templates set on initialization with ones emitted by OpenLineage integration + and stored in Variables by VariableTransport. + :param event_templates: dictionary where key is the key used by VariableTransport in format of ..event., and value is event template (fragment) that need to be in received events. + :param file_path: alternatively, file_path pointing to file with event templates will be used + :param env: jinja environment used to render event templates + :param allow_duplicate_events: if set to True, allows multiple events for the same key + :param clear_variables: if set to True, clears all variables after checking events + :raises: ValueError if the received events do not match with expected ones. + """ + + def __init__( + self, + event_templates: dict[str, dict] | None = None, + file_path: str | None = None, + env: Environment = setup_jinja(), + allow_duplicate_events: bool = False, + clear_variables: bool = True, + **kwargs, + ): + super().__init__(**kwargs) + self.event_templates = event_templates + self.file_path = file_path + self.env = env + self.multiple_events = allow_duplicate_events + self.delete = clear_variables + if self.event_templates and self.file_path: + raise ValueError("Can't pass both event_templates and file_path") + + def execute(self, context: Context) -> None: + if self.file_path is not None: + self.event_templates = {} + with open(self.file_path) as f: # type: ignore[arg-type] + events = json.load(f) + for event in events: + key = event["job"]["name"] + ".event." + event["eventType"].lower() + self.event_templates[key] = event + for key, template in self.event_templates.items(): # type: ignore[union-attr] + send_event = Variable.get(key=key, deserialize_json=True) + if len(send_event) == 0: + raise ValueError(f"No event for key {key}") + if len(send_event) != 1 and not self.multiple_events: + raise ValueError(f"Expected one event for key {key}, got {len(send_event)}") + self.log.info("Events: %s, %s, %s", send_event, len(send_event), type(send_event)) + if not match(template, json.loads(send_event[0]), self.env): + raise ValueError("Event received does not match one specified in test") + if self.delete: + with create_session() as session: + session.query(Variable).delete() diff --git a/providers/tests/system/openlineage/transport/__init__.py b/providers/tests/system/openlineage/transport/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/tests/system/openlineage/transport/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/tests/system/openlineage/transport/variable.py b/providers/tests/system/openlineage/transport/variable.py new file mode 100644 index 0000000000000..beeeac5aff6d0 --- /dev/null +++ b/providers/tests/system/openlineage/transport/variable.py @@ -0,0 +1,49 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING + +from openlineage.client.serde import Serde +from openlineage.client.transport import Transport, get_default_factory + +from airflow.models.variable import Variable + +if TYPE_CHECKING: + from openlineage.client.client import Event + + +class VariableTransport(Transport): + """ + This transport sends OpenLineage events to Variables. + + Key schema is ..event.. + It's made to be used in system tests, stored data read by OpenLineageTestOperator. + """ + + kind = "variable" + + def emit(self, event: Event) -> None: + key = f"{event.job.name}.event.{event.eventType.value.lower()}" # type: ignore[union-attr] + event_str = Serde.to_json(event) + if (var := Variable.get(key=key, default_var=None, deserialize_json=True)) is not None: + Variable.set(key=key, value=var + [event_str], serialize_json=True) + else: + Variable.set(key=key, value=[event_str], serialize_json=True) + + +get_default_factory().register_transport(VariableTransport.kind, VariableTransport)