diff --git a/examples/misc/airflow/README.md b/examples/misc/airflow/README.md new file mode 100644 index 000000000..d9959fb9a --- /dev/null +++ b/examples/misc/airflow/README.md @@ -0,0 +1,82 @@ +# Airflow + +This example shows how to run the `dstack` CLI and API from Airflow pipelines. +It uses Airflow 2 and the [TaskFlow API](https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html). + +## Preparing a virtual environment + +`dstack` and Airflow may have conflicting dependencies, so it's recommended to install +`dstack` to a separate virtual environment available to Airflow. + +Ensure the virtual environment created for `dstack` is +available to all the workers in case your Airflow runs in a distributed environment. + +## Running dstack CLI + +To run the `dstack` CLI from Airflow, +we can run it as regular bash commands using [BashOperator](https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/bash.html). +The only special step here is that we need to activate a virtual environment before running `dstack`: + +```python + +DSTACK_VENV_PATH = "/path/to/dstack-venv" + +@dag(...) +def pipeline(...): + ... + @task.bash + def dstack_cli_apply_venv() -> str: + return ( + f"source {DSTACK_VENV_PATH}/bin/activate" + f" && cd {DSTACK_REPO_PATH}" + " && dstack init" + " && dstack apply -y -f task.dstack.yml" + ) +``` + +## Running dstack API + +To run the `dstack` API from Airflow, we can use [ExternalPythonOperator](https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html#externalpythonoperator). Specify a path to the Python binary inside the dstack virtual environment, and +Airflow will run the code inside that virtual environment: + +```python + +DSTACK_VENV_PYTHON_BINARY_PATH = f"{DSTACK_VENV_PATH}/bin/python" + +@dag(...) +def pipeline(...): + ... + @task.external_python(task_id="external_python", python=DSTACK_VENV_PYTHON_BINARY_PATH) + def dstack_api_submit_venv() -> str: + from dstack.api import Client, Task + + task = Task( + commands=[ + "echo 'Running dstack task via Airflow'", + "sleep 10", + "echo 'Finished'", + ] + ) + # Pick up config from `~/.dstack/config.yml` + # or set explicitly from Ariflow Variables. + client = Client.from_config() + + run = client.runs.submit( + run_name="my-airflow-task", + configuration=task, + ) + run.attach() + try: + for log in run.logs(): + sys.stdout.buffer.write(log) + sys.stdout.buffer.flush() + except KeyboardInterrupt: + run.stop(abort=True) + finally: + run.detach() +``` + +## Source code + +The source code for this example can be found in +[`examples/misc/airflow` :material-arrow-top-right-thin:{ .external }](https://github.com/dstackai/dstack/blob/master/examples/misc/airflow). diff --git a/examples/misc/airflow/dags/dstack-repo/task.dstack.yml b/examples/misc/airflow/dags/dstack-repo/task.dstack.yml new file mode 100644 index 000000000..53812c25e --- /dev/null +++ b/examples/misc/airflow/dags/dstack-repo/task.dstack.yml @@ -0,0 +1,5 @@ +type: task +commands: + - echo "Running dstack task via Airflow" + - sleep 10 + - echo "Finished" diff --git a/examples/misc/airflow/dags/dstack_tasks.py b/examples/misc/airflow/dags/dstack_tasks.py new file mode 100644 index 000000000..6b02f096a --- /dev/null +++ b/examples/misc/airflow/dags/dstack_tasks.py @@ -0,0 +1,97 @@ +import os +import sys +from datetime import datetime, timedelta + +from airflow.configuration import conf +from airflow.decorators import dag, task + +# dstack repo files are stored in the dags folder as an example. +# Put dstack repo files in another place if appropriate. +DAGS_DIR_PATH = os.path.join(conf.get("core", "DAGS_FOLDER")) +DSTACK_REPO_PATH = f"{DAGS_DIR_PATH}/dstack-repo" + +# A separate virtual environment should be created for dstack if dstack cannot be +# installed into the main Airflow environment. For example, due to incompatible dependencies. +DSTACK_VENV_PATH = "/path/to/dstack-venv" # Change this ! +DSTACK_VENV_PYTHON_BINARY_PATH = f"{DSTACK_VENV_PATH}/bin/python" + + +default_args = { + "owner": "airflow", + "retries": 1, + "retry_delay": timedelta(minutes=5), + "start_date": datetime(2024, 11, 13), +} + + +@dag( + default_args=default_args, + schedule_interval=timedelta(days=1), + catchup=False, + description="Examples of running dstack via Airflow", +) +def dstack_tasks(): + @task.bash + def dstack_cli_apply() -> str: + """ + This task shows how to run the dstack CLI when + dstack is installed into the main Airflow environment. + NOT RECOMMENDED since dstack and Airflow may have conflicting dependencies. + """ + return f"cd {DSTACK_REPO_PATH}" " && dstack init" " && dstack apply -y -f task.dstack.yml" + + @task.bash + def dstack_cli_apply_venv() -> str: + """ + This task shows how to run the dstack CLI when + dstack is installed into a separate virtual environment available to Airflow. + """ + return ( + f"source {DSTACK_VENV_PATH}/bin/activate" + f" && cd {DSTACK_REPO_PATH}" + " && dstack init" + " && dstack apply -y -f task.dstack.yml" + ) + + @task.external_python(task_id="external_python", python=DSTACK_VENV_PYTHON_BINARY_PATH) + def dstack_api_submit_venv() -> str: + """ + This task shows how to run the dstack API when + dstack is installed into a separate virtual environment available to Airflow. + Note that the venv must have the `pendulum` package installed. + """ + from dstack.api import Client, Task + + task = Task( + commands=[ + "echo 'Running dstack task via Airflow'", + "sleep 10", + "echo 'Finished'", + ] + ) + # Pick up config from `~/.dstack/config.yml` + # or set explicitly from Ariflow Variables. + client = Client.from_config() + + run = client.runs.submit( + run_name="my-airflow-task", + configuration=task, + ) + run.attach() + try: + for log in run.logs(): + sys.stdout.buffer.write(log) + sys.stdout.buffer.flush() + except KeyboardInterrupt: + run.stop(abort=True) + finally: + run.detach() + + # Uncomment a task you want to run + + # dstack_cli_apply() + # dstack_cli_apply_venv() + dstack_api_submit_venv() + + +dstack_tasks()