Skip to content

Commit ac645a2

Browse files
authored
Merge pull request #9 from michaelconan/feat/notion-habits
Feat/notion habits
2 parents 82079e6 + 1305371 commit ac645a2

18 files changed

+313
-148
lines changed

.devcontainer/docker-compose.yml

+5
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,13 @@ services:
44
app:
55
environment:
66
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://postgres:postgres@db/postgres
7+
AIRFLOW__DATABASE__LOAD_DEFAULT_CONNECTIONS: "false"
78
AIRFLOW__CORE__EXECUTOR: LocalExecutor
89
AIRFLOW__CORE__FERNET_KEY: ""
10+
AIRFLOW__CORE__LOAD_EXAMPLES: "false"
11+
AIRFLOW__WEBSERVER__INSTANCE_NAME: "Local Instance"
12+
AIRFLOW__WEBSERVER__EXPOSE_CONFIG: "true"
13+
AIRFLOW__CORE__DAGS_FOLDER: "/workspaces/personal-reporting/dags"
914
build:
1015
context: .
1116
dockerfile: Dockerfile

.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
__pycache__
22
logs
33
*.csv
4+
5+
# Secrets and variables
46
.env
7+
config/*

.vscode/launch.json

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
{
2+
"version": "0.2.0",
3+
"configurations": [
4+
{
5+
// Disable coverage check to allow breakpoints in debug
6+
"name": "Python Debugger: PyTest No Coverage",
7+
"type": "debugpy",
8+
"request": "launch",
9+
"program": "${workspaceFolder}",
10+
"module": "pytest",
11+
"console": "integratedTerminal",
12+
"args": [
13+
"--no-cov"
14+
],
15+
"justMyCode": true
16+
}
17+
]
18+
}

.vscode/settings.json

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
{
2+
"python.testing.pytestArgs": [
3+
"tests",
4+
// Coverage of source code
5+
"--cov=src/",
6+
// INFO level pytest logging
7+
"--log-cli-level=INFO",
8+
],
9+
"python.testing.unittestEnabled": false,
10+
"python.testing.pytestEnabled": true,
11+
"editor.defaultFormatter": "ms-python.black-formatter",
12+
"editor.formatOnSave": true
13+
}

Procfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
web: airflow webserver --hostname 0.0.0.0 --port ${PORT:-8080} --workers 2 --worker-timeout 600
2-
scheduler: airflow db migrate && airflow scheduler
2+
scheduler: airflow scheduler

README.md

+34-33
Original file line numberDiff line numberDiff line change
@@ -4,59 +4,60 @@ Airflow server for personal data integration and experimentation.
44

55
## Overview
66

7-
Dev container, requirements and constraints files used for local development prior to deployment.
8-
9-
## Resources
10-
11-
I have found a few resources covering deploying Airflow on Azure, none of which have been entirely usable for my purpose.
12-
13-
- [2018 Azure blog](https://azure.microsoft.com/es-es/blog/deploying-apache-airflow-in-azure-to-build-and-run-data-pipelines/) and [2022 Azure Quickstart](https://learn.microsoft.com/en-us/samples/azure/azure-quickstart-templates/airflow-postgres-app-services/) both use Puckel airflow image which does not support Airflow 2.0
7+
This repository contains a Docker Development Container for VSCode and the infrastructure and workflows for my personal airflow instance. It has been deployed as a Python 3.12 application to Azure App Service on a relatively small instance with a small PostgreSQL metadatabase.
148

159
## Data Sources
1610

17-
1. Google Contacts
11+
The following data will be ingested from my personal systems into a BigQuery warehouse for automation and analysis.
12+
13+
1. Notion
1814
2. HubSpot
19-
3. Notion
15+
3. Google Contacts
2016

21-
## Airflow Notes
17+
## Setup
2218

23-
- DAGs should contain a number of related steps (e.g., extract-transform-load)
24-
- DAGs can be linked to one another via datasets to enable triggering and dependency graph
19+
### Airflow Setup
2520

26-
## Python Setup (Attempt #2)
21+
While it generally isn't recommended to maintain infrastructure and workflows in the same repository, it is not a major concern for this basic setup. Currently there are no custom plugins or updates to configuration via `airflow.cfg` or `webserver_config.py` (just via environment variables), so only the `DAGS_FOLDER` is mapped to the application code and configurations are loaded from the default `AIRFLOW_HOME`.
2722

28-
As an alternative to Docker I tried a standard Python configuration for easier deployment to App Service.
29-
30-
### Airflow Setup
23+
To run Airflow on a single instance, I used Honcho to run multiple processes via Procfile (webserver + scheduler)
3124

3225
### Azure Setup
3326

3427
1. Create Web App + PostgreSQL with Python
3528
2. Turn on Application Insights, Logging
29+
3. Set relevant environment variables for Airflow
30+
- `AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://{username}:{password}@{host}:{port}/{database}` from the Azure database
31+
- `AIRFLOW__API__AUTH_BACKENDS=airflow.api.auth.backend.basic_auth, airflow.api.auth.backend.session` for authorization
32+
- `AIRFLOW__CORE__EXECUTOR=LocalExecutor` to allow parallel execution on a single instance
33+
- `AIRFLOW__CORE__DAGS_FOLDER=/home/site/wwwroot/dags` to load dags from deployed application folder
34+
- `AIRFLOW__CORE__FERNKET_KEY={generated-key}` following [this guidance](https://airflow.apache.org/docs/apache-airflow/1.10.8/howto/secure-connections.html) to encrypt connection data
35+
- `AIRFLOW__WEBSERVER__WEB_SERVER_MASTER_TIMEOUT=600` to allow for longer startup
36+
- `AIRFLOW__WEBSERVER__EXPOSE_CONFIG=true` to facilitate administration
37+
4. Generate Publish Profile file and deploy application code from GitHub
38+
5. Set startup command to use the `startup.txt` file
39+
6. Run database migrations (`airflow db migrate`) and user setup (`airflow users create`) as one-off admin process, Procfile just for main processes
40+
- Reference [quick start](https://airflow.apache.org/docs/apache-airflow/stable/start.html) for guidance on this setup process
41+
- It may be necessary to run these via startup command to get the app to launch
3642

3743
### Automated Deployment
3844

39-
1. Referenced [this workflow](https://learn.microsoft.com/en-us/azure/app-service/deploy-github-actions?tabs=applevel%2Cpython%2Cpythonn) to deploy Python app to App Service
40-
41-
## Docker Setup (Attempt #1)
45+
1. I referenced [this workflow](https://learn.microsoft.com/en-us/azure/app-service/deploy-github-actions?tabs=applevel%2Cpython%2Cpythonn) to deploy Python app to App Service using Publish Profile basic authentication
4246

43-
This was working fine locally but deployment to App Service did not go as well. Goal to use a custom extended airflow Docker container
47+
### Integrations
4448

45-
### Airflow Setup
49+
1. Google Cloud BigQuery using [Airflow BigQuery Provider](https://airflow.apache.org/docs/apache-airflow-providers-google/stable/operators/cloud/bigquery.html#upsert-table)
50+
2. Notion using [Notion Client](https://pypi.org/project/notion-client/)
4651

47-
1. Started with official [docker compose](https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html)
48-
2. Decided to [extend the image](https://airflow.apache.org/docs/docker-stack/build.html#extending-the-image) starting with the slim version due to limited packaged requirements
49-
3. Switched to `LocalExecutor`
50-
4. Stripped down the Docker Compose to a single container to run webserver + scheduler and use for dev container
51-
52-
### Azure Setup
52+
## Testing
5353

54-
From the resources above I determined I can run the webserver and scheduler together in a single container with `LocalExecutor`. `CeleryExecutor` shouldn't be necessary unless scale increases and multiple workers are required.
54+
### Environments
5555

56-
1. Create PostgreSQL flexible server for database
57-
2. Create App Service app with container (NOTE: I followed [this tutorial](https://learn.microsoft.com/en-us/azure/app-service/configure-custom-container?tabs=debian&pivots=container-linux#enable-ssh) to configure SSH access in app service)
56+
Unit testing and the local instance are connected to a separate Google Cloud Platform project for development purposes.
5857

59-
### Automated Docker Deployment
58+
### Setup Steps
6059

61-
1. Referenced [this workflow](https://docs.github.com/en/actions/use-cases-and-examples/publishing-packages/publishing-docker-images#publishing-images-to-github-packages) to build and publish to GitHub Container Registry
62-
2. Referenced [this workflow](https://learn.microsoft.com/en-us/azure/app-service/deploy-best-practices#use-github-actions) to deploy updated image to Azure Web App
60+
1. Build Dev Container in VSCode, this will run `script/setup` to install dependencies (with dev)
61+
2. To run server locally, run `honcho start` in the terminal
62+
3. Add connection settings in the interface or upload via file
63+
4. Write and run [unit tests for DAGs](https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#unit-tests)

config/airflow.cfg

-13
This file was deleted.

dags/notion_habits.py

+173
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
"""
2+
notion_habits.py
3+
4+
DAG to load daily and weekly habits from Notion API into BigQuery.
5+
"""
6+
7+
# Basic imports
8+
import datetime
9+
import pendulum
10+
11+
# Standard airflow imports
12+
from airflow.decorators import dag, task
13+
from airflow.models.param import Param
14+
from airflow.operators.python import get_current_context
15+
from airflow.exceptions import AirflowSkipException
16+
17+
# Airflow hooks and operators
18+
from airflow.hooks.base import BaseHook
19+
from airflow.providers.google.cloud.operators.bigquery import (
20+
BigQueryCreateEmptyDatasetOperator,
21+
BigQueryCreateEmptyTableOperator,
22+
BigQueryInsertJobOperator,
23+
)
24+
25+
26+
@dag(
27+
schedule="@daily",
28+
catchup=True,
29+
start_date=pendulum.datetime(2024, 10, 1, tz="UTC"),
30+
dagrun_timeout=datetime.timedelta(minutes=20),
31+
tags=["habits", "notion", "staging"],
32+
# Provide tables via params to enable unit testing
33+
params={"daily_habit_table": Param("notion_habits_daily")},
34+
)
35+
def NotionHabits():
36+
37+
# Notion package and details
38+
from notion_client import Client
39+
40+
NOTION_CONN_ID = "notion_productivity"
41+
DAILY_HABIT_DB = "10140b50-0f0d-43d2-905a-7ed714ef7f2c"
42+
WEEKLY_HABIT_DB = "11e09eb8-3f76-80e7-8fac-e8d0bb538fb0"
43+
44+
# BigQuery connection details
45+
BQ_CONN_ID = "bigquery_reporting"
46+
BQ_DATASET = "staging"
47+
BQ_LOCATION = "us-central1"
48+
49+
# Create staging dataset if it doesn't exist
50+
create_staging_dataset = BigQueryCreateEmptyDatasetOperator(
51+
task_id="create_staging_dataset",
52+
gcp_conn_id=BQ_CONN_ID,
53+
dataset_id=BQ_DATASET,
54+
location=BQ_LOCATION,
55+
if_exists="ignore",
56+
)
57+
58+
# Create daily habits staging table if it doesn't exist
59+
create_daily_table = BigQueryCreateEmptyTableOperator(
60+
task_id="create_staging_notion_habits_daily_table",
61+
gcp_conn_id=BQ_CONN_ID,
62+
dataset_id=BQ_DATASET,
63+
table_id="{{ params.daily_habit_table }}",
64+
schema_fields=[
65+
{
66+
"name": "bq_id",
67+
"type": "STRING",
68+
"mode": "REQUIRED",
69+
"defaultValueExpression": "GENERATE_UUID()",
70+
},
71+
{"name": "page_id", "type": "STRING", "mode": "REQUIRED"},
72+
{"name": "database_id", "type": "STRING", "mode": "REQUIRED"},
73+
{"name": "date", "type": "DATE", "mode": "REQUIRED"},
74+
{"name": "habit", "type": "STRING", "mode": "REQUIRED"},
75+
{"name": "is_complete", "type": "BOOLEAN", "mode": "REQUIRED"},
76+
{"name": "page_created", "type": "TIMESTAMP", "mode": "REQUIRED"},
77+
{"name": "page_edited", "type": "TIMESTAMP", "mode": "REQUIRED"},
78+
{
79+
"name": "bq_ts",
80+
"type": "TIMESTAMP",
81+
"mode": "REQUIRED",
82+
"defaultValueExpression": "CURRENT_TIMESTAMP()",
83+
},
84+
],
85+
if_exists="ignore",
86+
)
87+
88+
@task(
89+
task_id="load_notion_habits_daily_tasks",
90+
)
91+
def get_daily_tasks(params: dict):
92+
# Get task context for data interval
93+
context = get_current_context()
94+
dag = context["dag"]
95+
# Connect to Notion API
96+
connection = BaseHook.get_connection(NOTION_CONN_ID)
97+
client = Client(auth=connection.password)
98+
99+
notion_filters = {
100+
# Added formula property in database for last edited date to allow filter
101+
# Use data interval to get records edited within interval
102+
"and": [
103+
{
104+
"property": "Last Edited",
105+
"date": {"after": context["data_interval_start"].isoformat()},
106+
},
107+
{
108+
"property": "Last Edited",
109+
"date": {"before": context["data_interval_end"].isoformat()},
110+
},
111+
]
112+
}
113+
# Query the daily habits database
114+
query_rs = client.databases.query(
115+
database_id=DAILY_HABIT_DB,
116+
filter=notion_filters,
117+
)
118+
119+
# Pivot the results to a row per habit
120+
results = [
121+
(
122+
r["id"], # Notion page ID
123+
DAILY_HABIT_DB,
124+
r["properties"]["Date"]["date"]["start"][:10], # Notion task date
125+
p, # Notion habit name
126+
r["created_time"], # Notion page creation time
127+
r["last_edited_time"], # Notion page last edited time
128+
v["checkbox"], # Habit completion status
129+
)
130+
for r in query_rs["results"]
131+
# Include only checkbox properties
132+
for p, v in r["properties"].items()
133+
if v["type"] == "checkbox"
134+
]
135+
dag.log.info(
136+
f"{len(results)} daily habits updated for {context['execution_date']}"
137+
)
138+
139+
if not results:
140+
raise AirflowSkipException("No daily habits found for the interval.")
141+
else:
142+
# Structure row data for insert query
143+
data_rows = ", ".join(
144+
f"({', '.join([repr(v) for v in row])})" for row in results
145+
)
146+
insert_rows_query = f"""
147+
INSERT INTO {BQ_DATASET}.{params["daily_habit_table"]}
148+
(page_id, database_id, date, habit, page_created, page_edited, is_complete)
149+
VALUES {data_rows};
150+
"""
151+
152+
# Insert daily habits into BigQuery
153+
# TODO: Break into separate task - how to pass data?
154+
insert_daily_job = BigQueryInsertJobOperator(
155+
gcp_conn_id=BQ_CONN_ID,
156+
task_id="insert_query_job",
157+
configuration={
158+
"query": {
159+
"query": insert_rows_query,
160+
"useLegacySql": False,
161+
"priority": "BATCH",
162+
}
163+
},
164+
location=BQ_LOCATION,
165+
)
166+
insert_daily_job.execute(context=context)
167+
168+
# Define task flow - task functions must be called ()
169+
create_staging_dataset >> create_daily_table >> get_daily_tasks()
170+
171+
172+
# Assign the dag to global for execution
173+
dag = NotionHabits()

0 commit comments

Comments
 (0)