Skip to content

Commit

Permalink
Merge branch 'master' of github.com:dstackai/dstack
Browse files Browse the repository at this point in the history
  • Loading branch information
olgenn committed Nov 14, 2024
2 parents 7e8a573 + 6aac8e4 commit 08f359e
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 0 deletions.
82 changes: 82 additions & 0 deletions examples/misc/airflow/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Airflow

This example shows how to run the `dstack` CLI and API from Airflow pipelines.
It uses Airflow 2 and the [TaskFlow API](https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html).

## Preparing a virtual environment

`dstack` and Airflow may have conflicting dependencies, so it's recommended to install
`dstack` to a separate virtual environment available to Airflow.

Ensure the virtual environment created for `dstack` is
available to all the workers in case your Airflow runs in a distributed environment.

## Running dstack CLI

To run the `dstack` CLI from Airflow,
we can run it as regular bash commands using [BashOperator](https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/bash.html).
The only special step here is that we need to activate a virtual environment before running `dstack`:

```python

DSTACK_VENV_PATH = "/path/to/dstack-venv"

@dag(...)
def pipeline(...):
...
@task.bash
def dstack_cli_apply_venv() -> str:
return (
f"source {DSTACK_VENV_PATH}/bin/activate"
f" && cd {DSTACK_REPO_PATH}"
" && dstack init"
" && dstack apply -y -f task.dstack.yml"
)
```

## Running dstack API

To run the `dstack` API from Airflow, we can use [ExternalPythonOperator](https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html#externalpythonoperator). Specify a path to the Python binary inside the dstack virtual environment, and
Airflow will run the code inside that virtual environment:

```python

DSTACK_VENV_PYTHON_BINARY_PATH = f"{DSTACK_VENV_PATH}/bin/python"

@dag(...)
def pipeline(...):
...
@task.external_python(task_id="external_python", python=DSTACK_VENV_PYTHON_BINARY_PATH)
def dstack_api_submit_venv() -> str:
from dstack.api import Client, Task

task = Task(
commands=[
"echo 'Running dstack task via Airflow'",
"sleep 10",
"echo 'Finished'",
]
)
# Pick up config from `~/.dstack/config.yml`
# or set explicitly from Ariflow Variables.
client = Client.from_config()

run = client.runs.submit(
run_name="my-airflow-task",
configuration=task,
)
run.attach()
try:
for log in run.logs():
sys.stdout.buffer.write(log)
sys.stdout.buffer.flush()
except KeyboardInterrupt:
run.stop(abort=True)
finally:
run.detach()
```

## Source code

The source code for this example can be found in
[`examples/misc/airflow` :material-arrow-top-right-thin:{ .external }](https://github.com/dstackai/dstack/blob/master/examples/misc/airflow).
5 changes: 5 additions & 0 deletions examples/misc/airflow/dags/dstack-repo/task.dstack.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type: task
commands:
- echo "Running dstack task via Airflow"
- sleep 10
- echo "Finished"
97 changes: 97 additions & 0 deletions examples/misc/airflow/dags/dstack_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import os
import sys
from datetime import datetime, timedelta

from airflow.configuration import conf
from airflow.decorators import dag, task

# dstack repo files are stored in the dags folder as an example.
# Put dstack repo files in another place if appropriate.
DAGS_DIR_PATH = os.path.join(conf.get("core", "DAGS_FOLDER"))
DSTACK_REPO_PATH = f"{DAGS_DIR_PATH}/dstack-repo"

# A separate virtual environment should be created for dstack if dstack cannot be
# installed into the main Airflow environment. For example, due to incompatible dependencies.
DSTACK_VENV_PATH = "/path/to/dstack-venv" # Change this !
DSTACK_VENV_PYTHON_BINARY_PATH = f"{DSTACK_VENV_PATH}/bin/python"


default_args = {
"owner": "airflow",
"retries": 1,
"retry_delay": timedelta(minutes=5),
"start_date": datetime(2024, 11, 13),
}


@dag(
default_args=default_args,
schedule_interval=timedelta(days=1),
catchup=False,
description="Examples of running dstack via Airflow",
)
def dstack_tasks():
@task.bash
def dstack_cli_apply() -> str:
"""
This task shows how to run the dstack CLI when
dstack is installed into the main Airflow environment.
NOT RECOMMENDED since dstack and Airflow may have conflicting dependencies.
"""
return f"cd {DSTACK_REPO_PATH}" " && dstack init" " && dstack apply -y -f task.dstack.yml"

@task.bash
def dstack_cli_apply_venv() -> str:
"""
This task shows how to run the dstack CLI when
dstack is installed into a separate virtual environment available to Airflow.
"""
return (
f"source {DSTACK_VENV_PATH}/bin/activate"
f" && cd {DSTACK_REPO_PATH}"
" && dstack init"
" && dstack apply -y -f task.dstack.yml"
)

@task.external_python(task_id="external_python", python=DSTACK_VENV_PYTHON_BINARY_PATH)
def dstack_api_submit_venv() -> str:
"""
This task shows how to run the dstack API when
dstack is installed into a separate virtual environment available to Airflow.
Note that the venv must have the `pendulum` package installed.
"""
from dstack.api import Client, Task

task = Task(
commands=[
"echo 'Running dstack task via Airflow'",
"sleep 10",
"echo 'Finished'",
]
)
# Pick up config from `~/.dstack/config.yml`
# or set explicitly from Ariflow Variables.
client = Client.from_config()

run = client.runs.submit(
run_name="my-airflow-task",
configuration=task,
)
run.attach()
try:
for log in run.logs():
sys.stdout.buffer.write(log)
sys.stdout.buffer.flush()
except KeyboardInterrupt:
run.stop(abort=True)
finally:
run.detach()

# Uncomment a task you want to run

# dstack_cli_apply()
# dstack_cli_apply_venv()
dstack_api_submit_venv()


dstack_tasks()

0 comments on commit 08f359e

Please sign in to comment.