Skip to content

Commit

Permalink
Add Apache Airflow 2.1.0 to the project dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
xuwenyihust committed May 8, 2024
1 parent b8527b0 commit b38e759
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 20 deletions.
28 changes: 28 additions & 0 deletions .github/workflows/build-docker-airflow.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
name: Build Docker - Airflow

on:
push:
paths:
- 'docker/airflow/**'
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3

- name: Log in to Docker Hub
uses: docker/login-action@v1
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_PASSWORD }}

- name: Docker Build & Push
run: |
timestamp=$(date +"%Y%m%d%H%M%S")
docker build -t ${{ secrets.DOCKERHUB_USERNAME }}/airflow:$timestamp -f docker/airflow/Dockerfile ./docker/airflow
docker tag ${{ secrets.DOCKERHUB_USERNAME }}/airflow:$timestamp ${{ secrets.DOCKERHUB_USERNAME }}/airflow:latest
docker push ${{ secrets.DOCKERHUB_USERNAME }}/airflow:$timestamp
docker push ${{ secrets.DOCKERHUB_USERNAME }}/airflow:latest
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ DataPulse is a platform for big data and AI. It is based on Apache Spark and Kub
- 3.5.0
- Delta Lake
- 3.0.0
- Airflow
- 2.1.0

## License
This project is licensed under the terms of the MIT license.
Expand Down
25 changes: 25 additions & 0 deletions dags/demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
'catchup': True
}

dag = DAG(
'demo_dag',
default_args=default_args,
description='A simple DAG for demo',
schedule_interval='@daily',
catchup=False,
)

run_script = BashOperator(
task_id='display_logs',
bash_command='python /opt/airflow/examples/airflow_demo.py',
dag=dag,
)

run_script
30 changes: 30 additions & 0 deletions dags/sg_resale_flat_prices.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from airflow import DAG
from airflow.providers.papermill.operators.papermill import PapermillOperator
from datetime import datetime, timedelta

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'start_date': datetime(2023, 1, 1),
'catchup': True
}

with DAG(
'SG_Resale_Flat_Prices',
default_args=default_args,
description='DAG for analysis on Singapore resale flat prices',
schedule_interval=timedelta(days=1),
catchup=False,
) as dag:

run_notebook = PapermillOperator(
task_id='sg_resale_flat_prices_notebook',
input_nb='/opt/airflow/examples/sg-resale-flat-prices/sg_resale_flat_prices.ipynb',
output_nb='/opt/airflow/examples/sg-resale-flat-prices/output/output-notebook-{{ execution_date }}.ipynb'
)

run_notebook
104 changes: 84 additions & 20 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,44 +69,108 @@ services:
cpus: "1"
memory: 1g

spark-worker-2:
image: wenyixu101/spark:latest
container_name: spark-worker-2
# spark-worker-2:
# image: wenyixu101/spark:latest
# container_name: spark-worker-2
# ports:
# - "8082:8082"
# environment:
# - SPARK_MASTER_HOST=spark-master
# - SPARK_MODE=worker
# - SPARK_WORKER_PORT=8082
# - SPARK_WORKER_WEBUI_PORT=8082
# - SPARK_HISTORY_OPTS=-Dspark.history.fs.logDirectory=/opt/data/spark-events
# command: "/opt/spark/bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077"
# volumes:
# - ./data/spark-events:/opt/data/spark-events
# - ./data/spark-warehouse:/opt/data/spark-warehouse
# - ./datasets:/opt/data/datasets
# pull_policy: always
# deploy:
# resources:
# limits:
# cpus: "1"
# memory: 1g

history-server:
image: wenyixu101/history-server:latest
container_name: history-server
ports:
- "8082:8082"
- "18080:18080"
environment:
- SPARK_MASTER_HOST=spark-master
- SPARK_MODE=worker
- SPARK_WORKER_PORT=8082
- SPARK_WORKER_WEBUI_PORT=8082
- SPARK_HISTORY_OPTS=-Dspark.history.fs.logDirectory=/opt/data/spark-events
command: "/opt/spark/bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077"
- SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=file:/opt/data/spark-events"
- SPARK_MODE=history-server
command: "/opt/spark/bin/spark-class org.apache.spark.deploy.history.HistoryServer"
volumes:
- ./data/spark-events:/opt/data/spark-events
- ./data/spark-warehouse:/opt/data/spark-warehouse
- ./datasets:/opt/data/datasets
pull_policy: always
deploy:
resources:
limits:
cpus: "1"
memory: 1g

history-server:
image: wenyixu101/history-server:latest
container_name: history-server
airflow-webserver:
image: wenyixu101/airflow:2.1.0
container_name: airflow-webserver
environment:
- AIRFLOW__CORE__EXECUTOR=LocalExecutor
- AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@airflow-postgres/airflow
- AIRFLOW__CORE__FERNET_KEY=<your_fernet_key>
ports:
- "18080:18080"
- "8090:8080"
command: >
bash -c "airflow db upgrade &&
airflow users create --username admin --firstname YourFirstName --lastname YourLastName --role Admin --email [email protected] --password yourpassword &&
airflow webserver"
volumes:
- ./dags:/opt/airflow/dags
- ./examples:/opt/airflow/examples
pull_policy: always
depends_on:
- airflow-scheduler
deploy:
resources:
limits:
cpus: "1"
memory: 1g

airflow-scheduler:
image: wenyixu101/airflow:2.1.0
container_name: airflow-scheduler
environment:
- SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=file:/opt/data/spark-events"
- SPARK_MODE=history-server
command: "/opt/spark/bin/spark-class org.apache.spark.deploy.history.HistoryServer"
- AIRFLOW__CORE__EXECUTOR=LocalExecutor
- AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@airflow-postgres/airflow
- AIRFLOW__CORE__FERNET_KEY=<your_fernet_key>
command: scheduler
volumes:
- ./data/spark-events:/opt/data/spark-events
- ./dags:/opt/airflow/dags
- ./examples:/opt/airflow/examples
pull_policy: always
depends_on:
- airflow-postgres
deploy:
resources:
limits:
cpus: "1"
memory: 1g

postgres:
image: postgres:13
container_name: postgres
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
ports:
- "5432:5432"
volumes:
- ./data/postgres:/var/lib/postgresql/data
pull_policy: always
deploy:
resources:
limits:
cpus: "1"
memory: 1g


3 changes: 3 additions & 0 deletions docker/airflow/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
FROM apache/airflow:2.1.0

RUN pip install apache-airflow-providers-papermill
11 changes: 11 additions & 0 deletions examples/airflow_demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def main():
logger.info("Starting task...")
logger.info("Task completed successfully!")

if __name__ == "__main__":
main()
File renamed without changes.
Empty file added requirements.txt
Empty file.

0 comments on commit b38e759

Please sign in to comment.