Skip to content

Commit

Permalink
add basic system tests for OpenLineage (apache#43643)
Browse files Browse the repository at this point in the history
Signed-off-by: Maciej Obuchowski <[email protected]>
  • Loading branch information
mobuchowski authored Nov 27, 2024
1 parent 890b3ec commit 7cc3610
Show file tree
Hide file tree
Showing 11 changed files with 590 additions and 2 deletions.
8 changes: 8 additions & 0 deletions docs/apache-airflow-providers-openlineage/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@
PyPI Repository <https://pypi.org/project/apache-airflow-providers-openlineage/>
Installing from sources <installing-providers-from-sources>

.. toctree::
:hidden:
:maxdepth: 1
:caption: System tests

System Tests <_api/tests/system/openlineage/index>


.. THE REMAINDER OF THE FILE IS AUTOMATICALLY GENERATED. IT WILL BE OVERWRITTEN AT RELEASE TIME!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,10 @@ def emit(self, event: RunEvent):
stack.enter_context(Stats.timer("ol.emit.attempts"))
self._client.emit(redacted_event)
self.log.debug("Successfully emitted OpenLineage event of id %s", event.run.runId)
except Exception as e:
except Exception:
Stats.incr("ol.emit.failed")
self.log.warning("Failed to emit OpenLineage event of id %s", event.run.runId)
self.log.debug("OpenLineage emission failure: %s", e)
self.log.debug("OpenLineage emission failure: %s", exc_info=True)

return redacted_event

Expand Down
16 changes: 16 additions & 0 deletions providers/tests/system/openlineage/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
36 changes: 36 additions & 0 deletions providers/tests/system/openlineage/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import pytest

from airflow.listeners.listener import get_listener_manager
from airflow.providers.openlineage.plugins.listener import OpenLineageListener

from providers.tests.system.openlineage.transport.variable import VariableTransport


@pytest.fixture(autouse=True)
def set_transport_variable():
lm = get_listener_manager()
lm.clear()
listener = OpenLineageListener()
listener.adapter._client = listener.adapter.get_or_create_openlineage_client()
listener.adapter._client.transport = VariableTransport()
lm.add_listener(listener)
yield
lm.clear()
38 changes: 38 additions & 0 deletions providers/tests/system/openlineage/example_openlineage.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
[
{
"eventType": "START",
"eventTime": "{{ is_datetime(result) }}",
"run": {
"runId": "{{ is_uuid(result) }}"
},
"job": {
"namespace": "default",
"name": "openlineage_basic_dag.do_nothing_task",
"facets": {
"jobType": {
"integration": "AIRFLOW",
"jobType": "TASK",
"processingType": "BATCH"
}
}
}
},
{
"eventType": "COMPLETE",
"eventTime": "{{ is_datetime(result) }}",
"run": {
"runId": "{{ is_uuid(result) }}"
},
"job": {
"namespace": "default",
"name": "openlineage_basic_dag.do_nothing_task",
"facets": {
"jobType": {
"integration": "AIRFLOW",
"jobType": "TASK",
"processingType": "BATCH"
}
}
}
}
]
55 changes: 55 additions & 0 deletions providers/tests/system/openlineage/example_openlineage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import os
from datetime import datetime

from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator

from providers.tests.system.openlineage.operator import OpenLineageTestOperator


def do_nothing():
pass


default_args = {"start_date": datetime(2021, 1, 1), "retries": 1}

# Instantiate the DAG
with DAG(
"openlineage_basic_dag",
default_args=default_args,
start_date=datetime(2021, 1, 1),
schedule=None,
catchup=False,
) as dag:
nothing_task = PythonOperator(task_id="do_nothing_task", python_callable=do_nothing)

check_events = OpenLineageTestOperator(
task_id="check_events",
file_path=f"{os.getenv('AIRFLOW_HOME')}/dags/providers/tests/system/openlineage/example_openlineage.json",
)

nothing_task >> check_events


from tests_common.test_utils.system_tests import get_test_run # noqa: E402

# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
[
{
"eventType": "START",
"eventTime": "{{ is_datetime(result) }}",
"run": {
"runId": "{{ is_uuid(result) }}"
},
"job": {
"namespace": "default",
"name": "openlineage_sensor_mapped_tasks_dag.wait_for_10_seconds",
"facets": {
"jobType": {
"integration": "AIRFLOW",
"jobType": "TASK",
"processingType": "BATCH"
}
}
}
},
{
"eventType": "COMPLETE",
"eventTime": "{{ is_datetime(result) }}",
"run": {
"runId": "{{ is_uuid(result) }}"
},
"job": {
"namespace": "default",
"name": "openlineage_sensor_mapped_tasks_dag.wait_for_10_seconds",
"facets": {
"jobType": {
"integration": "AIRFLOW",
"jobType": "TASK",
"processingType": "BATCH"
}
}
}
},
{
"eventType": "START",
"eventTime": "{{ is_datetime(result) }}",
"run": {
"runId": "{{ is_uuid(result) }}"
},
"job": {
"namespace": "default",
"name": "openlineage_sensor_mapped_tasks_dag.wait_for_10_seconds",
"facets": {
"jobType": {
"integration": "AIRFLOW",
"jobType": "TASK",
"processingType": "BATCH"
}
}
}
},

{
"eventType": "COMPLETE",
"eventTime": "{{ is_datetime(result) }}",
"run": {
"runId": "{{ is_uuid(result) }}"
},
"job": {
"namespace": "default",
"name": "openlineage_sensor_mapped_tasks_dag.wait_for_10_seconds",
"facets": {
"jobType": {
"integration": "AIRFLOW",
"jobType": "TASK",
"processingType": "BATCH"
}
}
}
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import os
from datetime import datetime, timedelta

from airflow import DAG
from airflow.models import Variable
from airflow.providers.standard.operators.python import PythonOperator
from airflow.providers.standard.sensors.time_delta import TimeDeltaSensor

from providers.tests.system.openlineage.operator import OpenLineageTestOperator


def my_task(task_number):
print(os.getcwd())
print(f"Executing task number: {task_number}")


def check_start_amount_func():
start_sensor_key = "openlineage_sensor_mapped_tasks_dag.wait_for_10_seconds.event.start" # type: ignore[union-attr]
events = Variable.get(start_sensor_key, deserialize_json=True)
if len(events) < 2:
raise ValueError(f"Expected at least 2 events, got {len(events)}")


with DAG(
dag_id="openlineage_sensor_mapped_tasks_dag",
start_date=datetime(2021, 1, 1),
schedule=None,
catchup=False,
) as dag:
wait_for_10_seconds = TimeDeltaSensor(
task_id="wait_for_10_seconds",
mode="reschedule",
poke_interval=5,
delta=timedelta(seconds=10),
)

mapped_tasks = [
PythonOperator(
task_id=f"mapped_task_{i}",
python_callable=my_task,
op_args=[i],
)
for i in range(2)
]

check_start_amount = PythonOperator(
task_id="check_order",
python_callable=check_start_amount_func,
)

check_events = OpenLineageTestOperator(
task_id="check_events",
file_path=f"{os.getenv('AIRFLOW_HOME')}/dags/providers/tests/system/openlineage/example_openlineage_mapped_sensor.json",
allow_duplicate_events=True,
)

wait_for_10_seconds >> mapped_tasks >> check_start_amount >> check_events


from tests_common.test_utils.system_tests import get_test_run # noqa: E402

# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)
Loading

0 comments on commit 7cc3610

Please sign in to comment.