Skip to content

Commit

Permalink
Merge pull request #34 from datacoves/feat-airflow-notifiers-document…
Browse files Browse the repository at this point in the history
…ation

Extend Notifiers documentation, also cover Slack ones
  • Loading branch information
BAntonellini authored Jun 21, 2024
2 parents 26480ee + 0f9ccdf commit 0c1cd68
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 45 deletions.
36 changes: 12 additions & 24 deletions docs/how-tos/airflow/send-ms-teams-notifications.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,29 +70,19 @@ In the examples below, we will send a notification on failing tasks or when the

> [!NOTE]In addition to `inform_failure` and `inform_success`, we support these callbacks `inform_failure`, `inform_success`, `inform_retry`, `inform_sla_miss`.
To send MS Teams notifications, in the Airflow DAG we need to import the appropriate callbacks and create a method that receives the following mandatory parameters:
To send MS Teams notifications, in the Airflow DAG we need to import the appropriate notifier and use it with the following parameters:

- `context` This is provided by Airflow
- `connection_id`: the name of the Datacoves Integration created above

Additionally, other parameters can be passed to customize the message sent to teams and the color shown on the message by passing:

- `message`: the body of the message
- `color`: border color of the MS Teams card
- `theme_color`: theme color of the MS Teams card

### Python version

```python
import datetime
from airflow.decorators import dag
from callbacks.microsoft_teams import inform_failure, inform_success
from operators.datacoves.dbt import DatacovesDbtOperator

def run_inform_success(context):
inform_success(context, connection_id="DATACOVES_MS_TEAMS", color="0000FF")

def run_inform_failure(context):
inform_failure(context, connection_id="DATACOVES_MS_TEAMS", color="9900FF")
from notifiers.datacoves.ms_teams import MSTeamsNotifier

@dag(
default_args={
Expand All @@ -105,8 +95,8 @@ def run_inform_failure(context):
schedule_interval="0 0 1 */12 *",
tags=["version_2", "ms_teams_notification", "blue_green"],
catchup=False,
on_success_callback=run_inform_success,
on_failure_callback=run_inform_failure,
on_success_callback=MSTeamsNotifier(message="DAG {{ dag.dag_id }} Succeeded"),
on_failure_callback=MSTeamsNotifier(message="DAG {{ dag.dag_id }} Failed"),
)
def dbt_run():
transform = DatacovesDbtOperator(
Expand All @@ -115,6 +105,7 @@ def dbt_run():

dag = yaml_teams_dag()
```

> [!NOTE]Quotation marks are not needed when setting the custom message. However, making use of Jinja in a YAML file requires the message to be wrapped quotations to be parsed properly. eg) "{{ dag.dag_id }} failed"
### YAML version
Expand All @@ -134,23 +125,20 @@ default_args:
catchup: false

# Optional callbacks used to send MS Teams notifications
custom_callbacks:
notifications:
on_success_callback:
module: callbacks.microsoft_teams
callable: inform_success
notifier: notifiers.datacoves.ms_teams.MSTeamsNotifier
args:
connection_id: DATACOVES_MS_TEAMS
# message: Custom success message
message: "DAG {{ dag.dag_id }} Succeeded"
color: 0000FF
on_failure_callback:
module: callbacks.microsoft_teams
callable: inform_failure
notifier: notifiers.datacoves.ms_teams.MSTeamsNotifier
args:
connection_id: DATACOVES_MS_TEAMS
# message: Custom error message
message: "DAG {{ dag.dag_id }} Failed"
color: 9900FF


# DAG Tasks
nodes:
transform:
Expand All @@ -160,6 +148,6 @@ nodes:
bash_command: "dbt run -s personal_loans"
```
## Getting Started Next Steps
## Getting Started Next Steps
Start [developing DAGs](getting-started/Admin/creating-airflow-dags.md)
36 changes: 15 additions & 21 deletions docs/how-tos/airflow/send-slack-notifications.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,31 +86,20 @@ In the examples below, we will send a notification on failing tasks or when the

> [!NOTE]In addition to `inform_failure` and `inform_success`, we support these callbacks `inform_failure`, `inform_success`, `inform_retry`, `inform_sla_miss`
To send Slack notifications, in the Airflow DAG we need to import the appropriate callbacks and create a method that receives the following mandatory parameters:
To send Slack notifications, in the Airflow DAG we need to import the appropriate callbacks and call them with:

- `context` This is provided by Airflow
- `connection_id`: the name of the Datacoves Integration created above

Additionally, `message` can be passed to customize the message sent to Slack
- `slack_webhook_conn_id`: the name of the Datacoves Integration created above
- `text`: to customize the message sent to Slack.

### Python version

```python
import datetime

from airflow.decorators import dag
from callbacks.slack_messages import inform_failure, inform_success
from kubernetes.client import models as k8s
from operators.datacoves.dbt import DatacovesDbtOperator


def run_inform_success(context):
inform_success(context, connection_id="DATACOVES_SLACK", color="0000FF")


def run_inform_failure(context):
inform_failure(context, connection_id="DATACOVES_SLACK", color="9900FF")

from airflow.providers.slack.notifications.slack_webhook import send_slack_webhook_notification

TRANSFORM_CONFIG = {
"pod_override": k8s.V1Pod(
Expand Down Expand Up @@ -140,8 +129,12 @@ TRANSFORM_CONFIG = {
schedule_interval="0 0 1 */12 *",
tags=["version_2", "slack_notification", "blue_green"],
catchup=False,
on_success_callback=run_inform_success,
on_failure_callback=run_inform_failure,
on_success_callback=send_slack_webhook_notification(
slack_webhook_conn_id="SLACK_NOTIFICATIONS", text="The dag {{ dag.dag_id }} succeeded"
),
on_failure_callback=send_slack_webhook_notification(
slack_webhook_conn_id="SLACK_NOTIFICATIONS", text="The dag {{ dag.dag_id }} failed"
),
)
def yaml_slack_dag():
transform = DatacovesDbtOperator(
Expand Down Expand Up @@ -174,12 +167,12 @@ catchup: false
# Optional callbacks used to send Slack notifications
callbacks:
on_success_callback:
callback: airflow.providers.slack.notifications.slack_webhook.SlackWebhookNotifier
callback: airflow.providers.slack.notifications.slack_webhook.send_slack_webhook_notification
args:
- slack_webhook_conn_id: SLACK_NOTIFICATIONS
- text: Custom success message
on_failure_callback:
callback: airflow.providers.slack.notifications.slack_webhook.SlackWebhookNotifier
callback: airflow.providers.slack.notifications.slack_webhook.send_slack_webhook_notification
args:
- slack_webhook_conn_id: SLACK_NOTIFICATIONS
- text: Custom error message
Expand All @@ -197,6 +190,7 @@ nodes:

bash_command: "dbt run -s personal_loans"
```
## Getting Started Next Steps
Start [developing DAGs](getting-started/Admin/creating-airflow-dags.md)
## Getting Started Next Steps
Start [developing DAGs](getting-started/Admin/creating-airflow-dags.md)

0 comments on commit 0c1cd68

Please sign in to comment.