Skip to content

Commit 18187d2

Browse files
authored
Merge pull request #12 from michaelconan/feat/notion-habits
Feat/notion habits
2 parents ac645a2 + f121ccd commit 18187d2

34 files changed

+1095
-221
lines changed

.devcontainer/devcontainer.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@
1010
"extensions": [
1111
"ms-python.python",
1212
"GitHub.copilot-chat",
13-
"ms-python.black-formatter"
13+
"ms-python.black-formatter",
14+
"bierner.markdown-mermaid"
1415
]
1516
}
1617
},

.devcontainer/docker-compose.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@ version: '3.8'
33
services:
44
app:
55
environment:
6+
AIRFLOW_HOME: "/workspaces/personal-reporting"
67
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://postgres:postgres@db/postgres
78
AIRFLOW__DATABASE__LOAD_DEFAULT_CONNECTIONS: "false"
89
AIRFLOW__CORE__EXECUTOR: LocalExecutor
910
AIRFLOW__CORE__FERNET_KEY: ""
1011
AIRFLOW__CORE__LOAD_EXAMPLES: "false"
1112
AIRFLOW__WEBSERVER__INSTANCE_NAME: "Local Instance"
1213
AIRFLOW__WEBSERVER__EXPOSE_CONFIG: "true"
13-
AIRFLOW__CORE__DAGS_FOLDER: "/workspaces/personal-reporting/dags"
1414
build:
1515
context: .
1616
dockerfile: Dockerfile

.gitignore

+14-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,20 @@
1+
# Cache data
12
__pycache__
3+
.pytest_cache
4+
.coverage
5+
6+
# Logs
27
logs
8+
*.log
9+
10+
# Data files
311
*.csv
412

513
# Secrets and variables
614
.env
7-
config/*
15+
config/*
16+
17+
# DBT compiled files
18+
dbt/*/target
19+
dbt/*/dbt_modules
20+
dbt/*/dbt_packages

.vscode/settings.json

+5-2
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@
22
"python.testing.pytestArgs": [
33
"tests",
44
// Coverage of source code
5-
"--cov=src/",
5+
"--cov=dags/",
66
// INFO level pytest logging
77
"--log-cli-level=INFO",
88
],
99
"python.testing.unittestEnabled": false,
1010
"python.testing.pytestEnabled": true,
1111
"editor.defaultFormatter": "ms-python.black-formatter",
12-
"editor.formatOnSave": true
12+
"editor.formatOnSave": true,
13+
"[json]": {
14+
"editor.defaultFormatter": "vscode.json-language-features"
15+
}
1316
}

README.md

+41
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,47 @@ The following data will be ingested from my personal systems into a BigQuery war
1414
2. HubSpot
1515
3. Google Contacts
1616

17+
### Warehouse Data Flow
18+
19+
```mermaid
20+
graph TB
21+
22+
%% Soruces
23+
S1[Notion]
24+
S2[HubSpot]
25+
26+
subgraph staging
27+
direction TB
28+
L1[Daily Habits]
29+
L2[Weekly Habits]
30+
L3[Contacts]
31+
L4[Companies]
32+
L5[Engagements]
33+
end
34+
35+
%% Source to Staging Flows
36+
S1 --> L1
37+
S1 --> L2
38+
S2 --> L3
39+
S2 --> L4
40+
S2 --> L5
41+
42+
subgraph cleansed
43+
C1[Habits]
44+
end
45+
46+
%% Staging to Cleansed Flows
47+
L1 --> C1
48+
L2 --> C1
49+
```
50+
51+
## Frameworks
52+
53+
1. [Alembic](https://alembic.sqlalchemy.org/en/latest/) database migrations for raw tables which should generally match the schema of the source system, run via [Airflow provider package](https://pypi.org/project/airflow-provider-alembic/)
54+
2. [Airflow](https://airflow.apache.org/) to orchestrate data loading scripts and additional automated workflows
55+
3. [DBT core](https://docs.getdbt.com/) to define data models and transformations, again orchestrated by Airflow (via CLI / `BashOperator`)
56+
57+
1758
## Setup
1859

1960
### Airflow Setup

dags/.airflowignore

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# Modules to ignore for DAG discovery using REGEX
2+
3+
# Common tasks to import into dags
4+
reporting/common/.*

dags/michael/__init__.py

Whitespace-only changes.

dags/michael/common/__init__.py

Whitespace-only changes.

dags/michael/common/bigquery.py

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
"""
2+
bigquery.py
3+
4+
Common functions to interact with Google BigQuery in DAG tasks.
5+
"""
6+
7+
# Airflow hooks and operators
8+
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
9+
10+
# Additional libraries
11+
from google.cloud import bigquery
12+
13+
14+
def load_file_to_bq(conn_id: str, file_path: str, table_id: str) -> str:
15+
"""Insert newline-delimited JSON file (JSONL) into BigQuery table
16+
17+
Args:
18+
conn_id (str): Airflow connection ID for BigQuery.
19+
file_path (str): Path to the file to load.
20+
table_name (str): Table name to load the data into.
21+
22+
Returns:
23+
str: State of completed insert job.
24+
"""
25+
# Load the data into BigQuery
26+
connection = BigQueryHook(gcp_conn_id=conn_id)
27+
client = connection.get_client()
28+
29+
job_config = bigquery.LoadJobConfig(
30+
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
31+
)
32+
with open(file_path, "rb") as source_file:
33+
job = client.load_table_from_file(source_file, table_id, job_config=job_config)
34+
job.result()
35+
36+
return job.state

dags/michael/dbt.py

+81
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
# Base imports
2+
import os
3+
import pendulum
4+
import datetime
5+
6+
# PyPI imports
7+
import yaml
8+
9+
# Airflow imports
10+
from airflow.decorators import dag, task
11+
from airflow.operators.bash import BashOperator
12+
from airflow.hooks.base import BaseHook
13+
14+
# Dataset name in BigQuery for DBT
15+
DBT_DATASET = "reporting"
16+
17+
18+
@dag(
19+
schedule="@daily",
20+
catchup=False,
21+
start_date=pendulum.datetime(2025, 1, 1),
22+
dagrun_timeout=datetime.timedelta(minutes=20),
23+
)
24+
def DBT():
25+
26+
# File paths for service account key and dbt profile
27+
PROFILES_DIR = "/tmp/.dbt"
28+
KEYFILE_PATH = os.path.join(PROFILES_DIR, "bq-service-account.json")
29+
PROFILE_PATH = os.path.join(PROFILES_DIR, "dbt_profile.yml")
30+
31+
@task(
32+
task_id="generate_dbt_profile",
33+
)
34+
def generate_dbt_profile():
35+
# Get BigQuery connection details
36+
conn = BaseHook.get_connection("bigquery_reporting")
37+
38+
# Write keyfile to temporary file
39+
os.makedirs(os.path.dirname(KEYFILE_PATH), exist_ok=True)
40+
with open(KEYFILE_PATH, "w") as f:
41+
f.write(conn.extra_dejson.get("keyfile_dict"))
42+
43+
# Generate profile with BigQuery details
44+
profile = {
45+
"michael": {
46+
"outputs": {
47+
"dev": {
48+
"type": "bigquery",
49+
"method": "service-account",
50+
"keyfile": KEYFILE_PATH,
51+
"dataset": DBT_DATASET,
52+
"project": conn.extra_dejson.get("project"),
53+
"location": conn.extra_dejson.get("location"),
54+
"priority": "interactive",
55+
"job_execution_timeout_seconds": 300,
56+
"job_retries": 1,
57+
"threads": 1,
58+
},
59+
},
60+
"target": "dev",
61+
}
62+
}
63+
# Create profile file for dbt run
64+
with open(PROFILE_PATH, "w") as f:
65+
yaml.dump(profile, f)
66+
67+
dbt_run = BashOperator(
68+
task_id="dbt_run",
69+
bash_command=f"dbt run --profiles-dir {PROFILES_DIR}",
70+
env={"DBT_PROFILES_DIR": PROFILES_DIR},
71+
)
72+
73+
@task(
74+
task_id="cleanup_files",
75+
)
76+
def cleanup_files():
77+
# Remove temporary files
78+
os.remove(PROFILES_DIR)
79+
80+
# Define DAG workflow
81+
generate_dbt_profile() >> dbt_run >> cleanup_files()

dags/michael/migrate.py

+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
# Base imports
2+
import os
3+
from datetime import datetime
4+
5+
# Airflow imports
6+
from airflow.models import DAG
7+
from airflow.decorators import task
8+
from airflow.models.param import Param
9+
10+
# Airflow hooks and operators
11+
from airflow.hooks.base import BaseHook
12+
from airflow_provider_alembic.operators.alembic import AlembicOperator
13+
14+
# Get migration folder relative to DAG
15+
migration_folder = os.path.join(os.path.dirname(__file__), "migrations")
16+
17+
with DAG(
18+
"raw_migrations",
19+
schedule="@once", # also consider "None"
20+
start_date=datetime(1970, 1, 1),
21+
params={"command": Param("upgrade"), "revision": Param("head")},
22+
) as dag:
23+
24+
KEYFILE_PATH = "/tmp/bq-service-account.json"
25+
BIGQUERY_CONN_ID = "bigquery_reporting"
26+
# Set keyfile as application default credentials
27+
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = KEYFILE_PATH
28+
29+
@task(task_id="bigquery_keyfile")
30+
def get_keyfile():
31+
# Get BigQuery connection details
32+
conn = BaseHook.get_connection(BIGQUERY_CONN_ID)
33+
34+
# Write keyfile to temporary file
35+
with open(KEYFILE_PATH, "w") as f:
36+
f.write(conn.extra_dejson.get("keyfile_dict"))
37+
38+
# Run migrations
39+
alembic_op = AlembicOperator(
40+
task_id="alembic_op",
41+
conn_id=BIGQUERY_CONN_ID,
42+
command="{{ params.command }}",
43+
revision="{{ params.revision }}",
44+
script_location=migration_folder,
45+
)
46+
47+
get_keyfile() >> alembic_op

dags/michael/migrations/README

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Generic single-database configuration.

0 commit comments

Comments
 (0)