-
Notifications
You must be signed in to change notification settings - Fork 154
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
3 changed files
with
184 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
# Airflow | ||
|
||
This example shows how to run the `dstack` CLI and API from Airflow pipelines. | ||
It uses Airflow 2 and the [TaskFlow API](https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html). | ||
|
||
## Preparing a virtual environment | ||
|
||
`dstack` and Airflow may have conflicting dependencies, so it's recommended to install | ||
`dstack` to a separate virtual environment available to Airflow. | ||
|
||
Ensure the virtual environment created for `dstack` is | ||
available to all the workers in case your Airflow runs in a distributed environment. | ||
|
||
## Running dstack CLI | ||
|
||
To run the `dstack` CLI from Airflow, | ||
we can run it as regular bash commands using [BashOperator](https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/bash.html). | ||
The only special step here is that we need to activate a virtual environment before running `dstack`: | ||
|
||
```python | ||
|
||
DSTACK_VENV_PATH = "/path/to/dstack-venv" | ||
|
||
@dag(...) | ||
def pipeline(...): | ||
... | ||
@task.bash | ||
def dstack_cli_apply_venv() -> str: | ||
return ( | ||
f"source {DSTACK_VENV_PATH}/bin/activate" | ||
f" && cd {DSTACK_REPO_PATH}" | ||
" && dstack init" | ||
" && dstack apply -y -f task.dstack.yml" | ||
) | ||
``` | ||
|
||
## Running dstack API | ||
|
||
To run the `dstack` API from Airflow, we can use [ExternalPythonOperator](https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html#externalpythonoperator). Specify a path to the Python binary inside the dstack virtual environment, and | ||
Airflow will run the code inside that virtual environment: | ||
|
||
```python | ||
|
||
DSTACK_VENV_PYTHON_BINARY_PATH = f"{DSTACK_VENV_PATH}/bin/python" | ||
|
||
@dag(...) | ||
def pipeline(...): | ||
... | ||
@task.external_python(task_id="external_python", python=DSTACK_VENV_PYTHON_BINARY_PATH) | ||
def dstack_api_submit_venv() -> str: | ||
from dstack.api import Client, Task | ||
|
||
task = Task( | ||
commands=[ | ||
"echo 'Running dstack task via Airflow'", | ||
"sleep 10", | ||
"echo 'Finished'", | ||
] | ||
) | ||
# Pick up config from `~/.dstack/config.yml` | ||
# or set explicitly from Ariflow Variables. | ||
client = Client.from_config() | ||
|
||
run = client.runs.submit( | ||
run_name="my-airflow-task", | ||
configuration=task, | ||
) | ||
run.attach() | ||
try: | ||
for log in run.logs(): | ||
sys.stdout.buffer.write(log) | ||
sys.stdout.buffer.flush() | ||
except KeyboardInterrupt: | ||
run.stop(abort=True) | ||
finally: | ||
run.detach() | ||
``` | ||
|
||
## Source code | ||
|
||
The source code for this example can be found in | ||
[`examples/misc/airflow` :material-arrow-top-right-thin:{ .external }](https://github.com/dstackai/dstack/blob/master/examples/misc/airflow). |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
type: task | ||
commands: | ||
- echo "Running dstack task via Airflow" | ||
- sleep 10 | ||
- echo "Finished" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
import os | ||
import sys | ||
from datetime import datetime, timedelta | ||
|
||
from airflow.configuration import conf | ||
from airflow.decorators import dag, task | ||
|
||
# dstack repo files are stored in the dags folder as an example. | ||
# Put dstack repo files in another place if appropriate. | ||
DAGS_DIR_PATH = os.path.join(conf.get("core", "DAGS_FOLDER")) | ||
DSTACK_REPO_PATH = f"{DAGS_DIR_PATH}/dstack-repo" | ||
|
||
# A separate virtual environment should be created for dstack if dstack cannot be | ||
# installed into the main Airflow environment. For example, due to incompatible dependencies. | ||
DSTACK_VENV_PATH = "/path/to/dstack-venv" # Change this ! | ||
DSTACK_VENV_PYTHON_BINARY_PATH = f"{DSTACK_VENV_PATH}/bin/python" | ||
|
||
|
||
default_args = { | ||
"owner": "airflow", | ||
"retries": 1, | ||
"retry_delay": timedelta(minutes=5), | ||
"start_date": datetime(2024, 11, 13), | ||
} | ||
|
||
|
||
@dag( | ||
default_args=default_args, | ||
schedule_interval=timedelta(days=1), | ||
catchup=False, | ||
description="Examples of running dstack via Airflow", | ||
) | ||
def dstack_tasks(): | ||
@task.bash | ||
def dstack_cli_apply() -> str: | ||
""" | ||
This task shows how to run the dstack CLI when | ||
dstack is installed into the main Airflow environment. | ||
NOT RECOMMENDED since dstack and Airflow may have conflicting dependencies. | ||
""" | ||
return f"cd {DSTACK_REPO_PATH}" " && dstack init" " && dstack apply -y -f task.dstack.yml" | ||
|
||
@task.bash | ||
def dstack_cli_apply_venv() -> str: | ||
""" | ||
This task shows how to run the dstack CLI when | ||
dstack is installed into a separate virtual environment available to Airflow. | ||
""" | ||
return ( | ||
f"source {DSTACK_VENV_PATH}/bin/activate" | ||
f" && cd {DSTACK_REPO_PATH}" | ||
" && dstack init" | ||
" && dstack apply -y -f task.dstack.yml" | ||
) | ||
|
||
@task.external_python(task_id="external_python", python=DSTACK_VENV_PYTHON_BINARY_PATH) | ||
def dstack_api_submit_venv() -> str: | ||
""" | ||
This task shows how to run the dstack API when | ||
dstack is installed into a separate virtual environment available to Airflow. | ||
Note that the venv must have the `pendulum` package installed. | ||
""" | ||
from dstack.api import Client, Task | ||
|
||
task = Task( | ||
commands=[ | ||
"echo 'Running dstack task via Airflow'", | ||
"sleep 10", | ||
"echo 'Finished'", | ||
] | ||
) | ||
# Pick up config from `~/.dstack/config.yml` | ||
# or set explicitly from Ariflow Variables. | ||
client = Client.from_config() | ||
|
||
run = client.runs.submit( | ||
run_name="my-airflow-task", | ||
configuration=task, | ||
) | ||
run.attach() | ||
try: | ||
for log in run.logs(): | ||
sys.stdout.buffer.write(log) | ||
sys.stdout.buffer.flush() | ||
except KeyboardInterrupt: | ||
run.stop(abort=True) | ||
finally: | ||
run.detach() | ||
|
||
# Uncomment a task you want to run | ||
|
||
# dstack_cli_apply() | ||
# dstack_cli_apply_venv() | ||
dstack_api_submit_venv() | ||
|
||
|
||
dstack_tasks() |