Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airflow usecases for Stored Procedures and share data between DAGs #187

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
= Execute Teradata Stored Procedure through Airflow Teradata Provider
:experimental:
:page-author: Satish Chinthanippu
:page-email: [email protected]
:page-revdate: March 25th, 2024
:description: Execute Teradata Stored Procedure through Airflow Teradata Provider.
:keywords: data warehouses, compute storage separation, teradata, vantage, cloud data platform, object storage, business intelligence, enterprise analytics, orchestration, elt, airflow.
:tabs:

== Overview

This tutorial demonstrates how to use airflow teradata provider to execute Teradata Stored Procedure.
If you are new to airflow and airflow teradata provider we recommend that you start with our link:https://quickstarts.teradata.com/airflow.html[introductory quickstart guide., window="_blank"]

== Prerequisites

* Ubuntu 22.x
* Access to a Teradata Vantage instance.
+
include::ROOT:partial$vantage_clearscape_analytics.adoc[]
* Python *3.8*, *3.9*, *3.10* or *3.11* installed.
* pip

== Install Apache Airflow

1. Set the AIRFLOW_HOME environment variable. Airflow requires a home directory and uses ~/airflow by default, but you can set a different location if you prefer. The AIRFLOW_HOME environment variable is used to inform Airflow of the desired location.
+
[source, bash]
----
export AIRFLOW_HOME=~/airflow
----
2. Install `apache-airflow` stable version 2.8.1 from PyPI repository.:
+
[source, bash]
----
AIRFLOW_VERSION=2.8.2
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
----
3. Install the Airflow Teradata provider stable version from PyPI repository.
+
[source, bash]
----
pip install "apache-airflow-providers-teradata"
----

+
NOTE: For security reasons, the test connection functionality is disabled by default across Airflow UI, API and CLI.
The availability of the functionality can be controlled by the test_connection flag in the core section of the Airflow configuration ($AIRFLOW_HOME/airflow.cfg) or Define below environment variable before starting airflow server.
export AIRFLOW__CORE__TEST_CONNECTION=Enabled
+


== Start Airflow Standalone

1. Run Airflow Standalone
+
[source, bash]
----
airflow standalone
----
2. Access the Airflow UI. Visit https://localhost:8080 in the browser and log in with the admin account details shown in the terminal.


Teradata Connections may be defined in Airflow in the following ways:

1. Using Airflow Web UI
2. Using Environment Variable

== Define a Teradata connection in Airflow Web UI

1. Open the Admin -> Connections section of the UI. Click the Create link to create a new connection.
+
image::{dir}/airflow-connection.png[Airflow admin dropdown, width=75%]
2. Fill in input details in New Connection Page.
+
image::{dir}/airflow-newconnection.png[Airflow New Connection, width=75%]
* Connection Id: Unique ID of Teradata Connection.
* Connection Type: Type of the system. Select Teradata.
* Database Server URL (required): Teradata instance hostname to connect to.
* Database (optional): Specify the name of the database to connect to
* Login (required): Specify the user name to connect.
* Password (required): Specify the password to connect.
* Click on Test and Save.

== Define a Teradata connection in Environment Variable
Airflow connections may be defined in environment variables in either of one below formats.

1. JSON format
2. URI format

+
NOTE: The naming convention is AIRFLOW_CONN_{CONN_ID}, all uppercase (note the single underscores surrounding CONN).
So if your connection id is teradata_conn_id then the variable name should be AIRFLOW_CONN_TERADATA_CONN_ID
+


== JSON format example


[source, bash]
----
export AIRFLOW_CONN_TERADATA_CONN_ID='{
"conn_type": "teradata",
"login": "teradata_user",
"password": "my-password",
"host": "my-host",
"schema": "my-schema",
"extra": {
"tmode": "TERA",
"sslmode": "verify-ca"
}
}'

----

== URI format example


[source, bash]
----
export AIRFLOW_CONN_TERADATA_CONN_ID='teradata://teradata_user:my-password@my-host/my-schema?tmode=TERA&sslmode=verify-ca'
----

Refer https://airflow.apache.org/docs/apache-airflow-providers-teradata/stable/connections/teradata.html[Teradata Hook] for detailed information on Teradata Connection in Airflow.

== Define a DAG to execute Teradata Stored Procedure in Airflow

1. In Airflow, DAGs are defined as Python code.
2. Create a DAG as a python file like example_stored_procedure.py under DAG_FOLDER - $AIRFLOW_HOME/files/dags directory.
+
[source, python]
----
from datetime import datetime
from airflow import DAG
from airflow.providers.teradata.operators.teradata import TeradataOperator, TeradataStoredProcedureOperator
CONN_ID = "teradata_conn_id"
with DAG(
dag_id="example_stored_procedure",
max_active_runs=1,
max_active_tasks=3,
catchup=False,
start_date=datetime(2023, 1, 1),
) as dag:
create_sp = TeradataOperator(
task_id="create_sp",
conn_id=CONN_ID,
sql=r"""replace procedure demo_sp (in p1 integer, inout p2 integer, out p3 integer)
dynamic result sets 2
begin
declare cur1 cursor with return for select * from dbc.dbcinfo order by 1 ;
declare cur2 cursor with return for select infodata, infokey from dbc.dbcinfo order by 1 ;
open cur1 ;
open cur2 ;
set p2 = p1 + p2 ;
set p3 = p1 * p2 ;
end ;
""",
)
execute_sp = TeradataStoredProcedureOperator(
task_id="execute_sp",
conn_id=CONN_ID,
procedure="demo_sp",
parameters=[3, 2, int]
)

----

== Load DAG

Airflow loads DAGs from Python source files, which it looks for inside its configured DAG_FOLDER - $AIRFLOW_HOME/files/dags directory.

== Run DAG
DAGs will run in one of two ways:
1. When they are triggered either manually or via the API
2. On a defined schedule, which is defined as part of the DAG
`example_stored_procedure` is defined to trigger as manually. To define a schedule, any valid link:https://en.wikipedia.org/wiki/Cron[Crontab, window="_blank"] schedule value can be passed to the schedule argument.
[source, python]
----
with DAG(
dag_id="example_stored_procedure",
schedule="0 0 * * *"
) as dag:
----

== Summary

This tutorial demonstrated how to use airflow teradata provider to execute Teradata Stored Procedure. The example DAG provided creates stored procedure
`demo_sp` in the Teradata Vantage defined in airflow environment. The stored procedure `demo_sp` is defined with 1 IN parameter, 1 INOUT parameter and 1 OUT parameter.
The INOUT/OUT parameter values can be used in the same DAG or other DAGs. Please refer https://quickstarts.teradata.com/pass-data-between-airflow-dags.html[Sharing data between DAGs in Airflow]


== Further reading
https://quickstarts.teradata.com/airflow.html[Use Apache Airflow with Teradata Vantage]


include::ROOT:partial$community_link.adoc[]
Loading
Loading