|
| 1 | +--- |
| 2 | +sidebar_position: 2 |
| 3 | +--- |
| 4 | +# Getting Started with Airflow and OpenLineage+Marquez |
| 5 | + |
| 6 | +In this example, we'll walk you through how to enable Airflow DAGs to send lineage metadata to [Marquez](https://marquezproject.ai/) using OpenLineage. |
| 7 | + |
| 8 | +### You’ll Learn How To: |
| 9 | + |
| 10 | +* configure Airflow to send OpenLineage events to Marquez |
| 11 | +* write OpenLineage-enabled DAGs |
| 12 | +* troubleshoot a failing DAG using Marquez |
| 13 | + |
| 14 | +## Prerequisites |
| 15 | + |
| 16 | +Before you begin, make sure you have installed: |
| 17 | + |
| 18 | +* [Docker 17.05](https://docs.docker.com/install)+ |
| 19 | +* [Astro CLI](https://docs.astronomer.io/astro/cli/overview) |
| 20 | +* [Subversion](https://subversion.apache.org/) |
| 21 | + |
| 22 | +> **Note:** We recommend that you have allocated at least **2 CPUs** and **8 GB** of memory to Docker. |
| 23 | +
|
| 24 | +## Step 1: Configure Your Astro Project |
| 25 | + |
| 26 | +Use the Astro CLI to create and run an Airflow project locally that will integrate with Marquez. |
| 27 | + |
| 28 | +1. In your project directory, create a new Astro project: |
| 29 | + |
| 30 | + ```sh |
| 31 | + $ .. |
| 32 | + $ mkdir astro-marquez-tutorial && cd astro-marquez-tutorial |
| 33 | + $ astro dev init |
| 34 | + ``` |
| 35 | + |
| 36 | +2. Using Subversion, download some scripts required by Marquez services: |
| 37 | + |
| 38 | + ```sh |
| 39 | + svn checkout https://github.com/MarquezProject/marquez/trunk/docker |
| 40 | + ``` |
| 41 | + |
| 42 | + After executing the above, your project directory should look like this: |
| 43 | + |
| 44 | + ```sh |
| 45 | + $ ls -a |
| 46 | + . Dockerfile packages.txt |
| 47 | + .. README.md plugins |
| 48 | + .astro airflow_settings.yaml requirements.txt |
| 49 | + .dockerignore dags tests |
| 50 | + .env docker |
| 51 | + .gitignore include |
| 52 | + ``` |
| 53 | + |
| 54 | +3. To configure Astro to send lineage metadata to Marquez, add the following environment variables below to your Astro project's `.env` file: |
| 55 | +
|
| 56 | + ```bash |
| 57 | + OPENLINEAGE_URL=http://host.docker.internal:5000 |
| 58 | + OPENLINEAGE_NAMESPACE=example |
| 59 | + AIRFLOW__LINEAGE__BACKEND=openlineage.lineage_backend.OpenLineageBackend |
| 60 | + ``` |
| 61 | +
|
| 62 | + These variables allow Airflow to connect with the OpenLineage API and send events to Marquez. |
| 63 | +
|
| 64 | +3. It is a good idea to have Airflow use a different port for Postgres than the default 5432, so run the following command to use port 5678 instead: |
| 65 | +
|
| 66 | + ```sh |
| 67 | + astro config set postgres.port 5678 |
| 68 | + ``` |
| 69 | +
|
| 70 | +## Step 3: Add Marquez Services using Docker Compose |
| 71 | +
|
| 72 | +Astro supports manual configuration of services via Docker Compose using YAML. |
| 73 | +
|
| 74 | +Create new file `docker-compose.override.yml` in your project and copy/paste the following into the file: |
| 75 | +
|
| 76 | +```yml |
| 77 | +version: "3.1" |
| 78 | +services: |
| 79 | + web: |
| 80 | + image: marquezproject/marquez-web:latest |
| 81 | + container_name: marquez-web |
| 82 | + environment: |
| 83 | + - MARQUEZ_HOST=api |
| 84 | + - MARQUEZ_PORT=5000 |
| 85 | + ports: |
| 86 | + - "3000:3000" |
| 87 | + depends_on: |
| 88 | + - api |
| 89 | +
|
| 90 | + db: |
| 91 | + image: postgres:12.1 |
| 92 | + container_name: marquez-db |
| 93 | + ports: |
| 94 | + - "6543:6543" |
| 95 | + environment: |
| 96 | + - POSTGRES_USER=postgres |
| 97 | + - POSTGRES_PASSWORD=password |
| 98 | + - MARQUEZ_DB=marquez |
| 99 | + - MARQUEZ_USER=marquez |
| 100 | + - MARQUEZ_PASSWORD=marquez |
| 101 | + volumes: |
| 102 | + - ./docker/init-db.sh:/docker-entrypoint-initdb.d/init-db.sh |
| 103 | + command: ["postgres", "-c", "log_statement=all"] |
| 104 | + |
| 105 | + api: |
| 106 | + image: marquezproject/marquez:latest |
| 107 | + container_name: marquez-api |
| 108 | + environment: |
| 109 | + - MARQUEZ_PORT=5000 |
| 110 | + - MARQUEZ_ADMIN_PORT=5001 |
| 111 | + ports: |
| 112 | + - "5000:5000" |
| 113 | + - "5001:5001" |
| 114 | + volumes: |
| 115 | + - ./docker/wait-for-it.sh:/usr/src/app/wait-for-it.sh |
| 116 | + links: |
| 117 | + - "db:postgres" |
| 118 | + depends_on: |
| 119 | + - db |
| 120 | + entrypoint: ["/bin/bash", "./wait-for-it.sh", "db:6543", "--", "./entrypoint.sh"] |
| 121 | +``` |
| 122 | +
|
| 123 | +The above adds the Marquez API, database and Web UI to Astro's Docker container and configures them to use the scripts in the `docker` directory you previously downloaded from Marquez. |
| 124 | + |
| 125 | +## Step 4: Start Airflow with Marquez |
| 126 | + |
| 127 | +Now you can start all services. To do so, verify that Docker is running and execute the following: |
| 128 | + |
| 129 | +```bash |
| 130 | +$ astro dev start |
| 131 | +``` |
| 132 | + |
| 133 | +**The above command will:** |
| 134 | + |
| 135 | +* start Airflow |
| 136 | +* start the Marquez API, database and UI |
| 137 | + |
| 138 | +To view the Airflow UI and verify it's running, open [http://localhost:8080](http://localhost:8080). Then, log in using the username and password `admin` / `admin`. You can also browse to [http://localhost:3000](http://localhost:3000) to view the Marquez UI. |
| 139 | +
|
| 140 | +## Step 5: Write Airflow DAGs |
| 141 | +
|
| 142 | +In this step, you will create two new Airflow DAGs that perform simple tasks. The `counter` DAG adds 1 to a column every minute, while the `sum` DAG calculates a sum every five minutes. This will result in a simple pipeline containing two jobs and two datasets. |
| 143 | +
|
| 144 | +### Step 5.1: Create a `counter` DAG |
| 145 | +
|
| 146 | +In `dags/`, create a file named `counter.py` and add the following code: |
| 147 | +
|
| 148 | +```python |
| 149 | +from airflow import DAG |
| 150 | +from airflow.decorators import task |
| 151 | +from airflow.providers.postgres.operators.postgres import PostgresOperator |
| 152 | +from airflow.utils.dates import days_ago |
| 153 | +
|
| 154 | +with DAG( |
| 155 | + 'counter', |
| 156 | + start_date=days_ago(1), |
| 157 | + schedule='*/1 * * * *', |
| 158 | + catchup=False, |
| 159 | + is_paused_upon_creation=False, |
| 160 | + max_active_runs=1, |
| 161 | + description='DAG that generates a new count value equal to 1.' |
| 162 | +): |
| 163 | +
|
| 164 | + query1 = PostgresOperator( |
| 165 | + task_id='if_not_exists', |
| 166 | + postgres_conn_id='example_db', |
| 167 | + sql=''' |
| 168 | + CREATE TABLE IF NOT EXISTS counts ( |
| 169 | + value INTEGER |
| 170 | + );''' |
| 171 | + ) |
| 172 | +
|
| 173 | + query2 = PostgresOperator( |
| 174 | + task_id='inc', |
| 175 | + postgres_conn_id='example_db', |
| 176 | + sql=''' |
| 177 | + INSERT INTO counts (value) |
| 178 | + VALUES (1) |
| 179 | + ''' |
| 180 | + ) |
| 181 | +
|
| 182 | +query1 >> query2 |
| 183 | +``` |
| 184 | +
|
| 185 | +### Step 5.2: Create a `sum` DAG |
| 186 | +
|
| 187 | +In `dags/`, create a file named `sum.py` and add the following code: |
| 188 | +
|
| 189 | +```python |
| 190 | +from airflow import DAG |
| 191 | +from airflow.providers.postgres.operators.postgres import PostgresOperator |
| 192 | +from airflow.utils.dates import days_ago |
| 193 | +
|
| 194 | +with DAG( |
| 195 | + 'sum', |
| 196 | + start_date=days_ago(1), |
| 197 | + schedule='*/5 * * * *', |
| 198 | + catchup=False, |
| 199 | + is_paused_upon_creation=False, |
| 200 | + max_active_runs=1, |
| 201 | + description='DAG that sums the total of generated count values.' |
| 202 | +): |
| 203 | +
|
| 204 | + query1 = PostgresOperator( |
| 205 | + task_id='if_not_exists', |
| 206 | + postgres_conn_id='example_db', |
| 207 | + sql=''' |
| 208 | + CREATE TABLE IF NOT EXISTS sums ( |
| 209 | + value INTEGER |
| 210 | + );''' |
| 211 | + ) |
| 212 | +
|
| 213 | + query2 = PostgresOperator( |
| 214 | + task_id='total', |
| 215 | + postgres_conn_id='example_db', |
| 216 | + sql=''' |
| 217 | + INSERT INTO sums (value) |
| 218 | + SELECT SUM(value) FROM counts; |
| 219 | + ''' |
| 220 | + ) |
| 221 | +
|
| 222 | +query1 >> query2 |
| 223 | +``` |
| 224 | +
|
| 225 | +## Step 6: View Collected Metadata |
| 226 | +
|
| 227 | +To ensure that Airflow is executing `counter` and `sum`, navigate to the DAGs tab in Airflow and verify that they are both enabled and are in a _running_ state: |
| 228 | +
|
| 229 | + |
| 230 | +
|
| 231 | +To view DAG metadata collected by Marquez from Airflow, browse to the Marquez UI by visiting [http://localhost:3000](http://localhost:3000). Then, use the _search_ bar in the upper right-side of the page and search for the `counter.inc` job. To view lineage metadata for `counter.inc`, click on the job from the drop-down list: |
| 232 | +
|
| 233 | +> **Note:** If the `counter.inc` job is not in the drop-down list, check to see if Airflow has successfully executed the DAG. |
| 234 | +
|
| 235 | +<p align="center"> |
| 236 | + <img src={require("./docs/current-search-count.png").default} /> |
| 237 | +</p> |
| 238 | +
|
| 239 | +If you take a quick look at the lineage graph for `counter.inc`, you should see `.public.counts` as an output dataset and `sum.total` as a downstream job! |
| 240 | +
|
| 241 | + |
| 242 | +
|
| 243 | +## Step 7: Troubleshoot a Failing DAG with Marquez |
| 244 | +
|
| 245 | +In this step, let's quickly walk through a simple troubleshooting scenario where the DAG `sum` begins to fail as the result of an upstream schema change for table `counts`. |
| 246 | + |
| 247 | +> **Tip:** It's helpful to apply the same code changes outlined below to your Airflow DAGs defined in **Step 6**. |
| 248 | +
|
| 249 | +Let's say team `A` owns the DAG `counter`. Team `A` decides to update the tasks in `counter` to rename the `values` column in the `counts` table to `value_1_to_10` (without properly communicating the schema change!): |
| 250 | + |
| 251 | +```diff |
| 252 | +query1 = PostgresOperator( |
| 253 | +- task_id='if_not_exists', |
| 254 | ++ task_id='alter_name_of_column', |
| 255 | + postgres_conn_id='example_db', |
| 256 | + sql=''' |
| 257 | +- CREATE TABLE IF NOT EXISTS counts ( |
| 258 | +- value INTEGER |
| 259 | +- );''', |
| 260 | ++ DO $$ |
| 261 | ++ BEGIN |
| 262 | ++ IF EXISTS(SELECT * |
| 263 | ++ FROM information_schema.columns |
| 264 | ++ WHERE table_name='counts' and column_name='value') |
| 265 | ++ THEN |
| 266 | ++ ALTER TABLE "counts" RENAME COLUMN "value" TO "value_1_to_10"; |
| 267 | ++ END IF; |
| 268 | ++ END $$; |
| 269 | ++ ''' |
| 270 | +) |
| 271 | +``` |
| 272 | +
|
| 273 | +```diff |
| 274 | +query2 = PostgresOperator( |
| 275 | + task_id='inc', |
| 276 | + postgres_conn_id='example_db', |
| 277 | + sql=''' |
| 278 | +- INSERT INTO counts (value) |
| 279 | ++ INSERT INTO counts (value_1_to_10) |
| 280 | + VALUES (%(value)s) |
| 281 | + ''', |
| 282 | + parameters={ |
| 283 | + 'value': random.randint(1, 10) |
| 284 | + } |
| 285 | +) |
| 286 | +``` |
| 287 | +
|
| 288 | +Team `B`, unaware of the schema change, owns the DAG `sum` and begins to see DAG run metadata with _failed_ run states: |
| 289 | +
|
| 290 | + |
| 291 | +
|
| 292 | +But, team `B` is not sure what might have caused the DAG failure as no recent code changes have been made to the DAG. So, team `B` decides to check the schema of the input dataset: |
| 293 | +
|
| 294 | + |
| 295 | +
|
| 296 | +Team `B` soon realizes that the schema has changed recently for the `counts` table! To fix the DAG, team `B` updates the `t2` task that calcuates the count total to use the new column name: |
| 297 | +
|
| 298 | +```diff |
| 299 | +query2 = PostgresOperator( |
| 300 | + task_id='total', |
| 301 | + postgres_conn_id='example_db', |
| 302 | + sql=''' |
| 303 | + INSERT INTO sums (value) |
| 304 | +- SELECT SUM(c.value) FROM counts AS c; |
| 305 | ++ SELECT SUM(c.value_1_to_10) FROM counts AS c; |
| 306 | + ''' |
| 307 | +) |
| 308 | +``` |
| 309 | +
|
| 310 | +With the code change, the DAG `sum` begins to run successfully: |
| 311 | +
|
| 312 | + |
| 313 | +
|
| 314 | +_Congrats_! You successfully step through a troubleshooting scenario of a failing DAG using metadata collected with Marquez! You can now add your own DAGs to `dags/` to build more complex data lineage graphs. |
| 315 | +
|
| 316 | +## Next Steps |
| 317 | +
|
| 318 | +* Review the Marquez [HTTP API](https://marquezproject.github.io/marquez/openapi.html) used to collect Airflow DAG metadata and learn how to build your own integrations using OpenLineage. |
| 319 | +* Take a look at [`openlineage-spark`](https://openlineage.io/docs/integrations/spark/) integration that can be used with Airflow. |
| 320 | +
|
| 321 | +## Feedback |
| 322 | +
|
| 323 | +What did you think of this example? You can reach out to us on [Slack](http://bit.ly/MqzSlack) and leave us feedback, or [open a pull request](https://github.com/MarquezProject/marquez/blob/main/CONTRIBUTING.md#submitting-a-pull-request) with your suggestions! |
0 commit comments