From b38e7593d6eb55df6cdbd8a4e74f55b8ada93d4b Mon Sep 17 00:00:00 2001 From: xuwenyihust Date: Wed, 8 May 2024 15:40:09 +0800 Subject: [PATCH] Add Apache Airflow 2.1.0 to the project dependencies --- .github/workflows/build-docker-airflow.yml | 28 ++++++ README.md | 2 + dags/demo.py | 25 +++++ dags/sg_resale_flat_prices.py | 30 ++++++ docker-compose.yaml | 104 +++++++++++++++++---- docker/airflow/Dockerfile | 3 + examples/airflow_demo.py | 11 +++ examples/{test.ipynb => demo.ipynb} | 0 requirements.txt | 0 9 files changed, 183 insertions(+), 20 deletions(-) create mode 100644 .github/workflows/build-docker-airflow.yml create mode 100755 dags/demo.py create mode 100644 dags/sg_resale_flat_prices.py create mode 100644 docker/airflow/Dockerfile create mode 100644 examples/airflow_demo.py rename examples/{test.ipynb => demo.ipynb} (100%) create mode 100644 requirements.txt diff --git a/.github/workflows/build-docker-airflow.yml b/.github/workflows/build-docker-airflow.yml new file mode 100644 index 0000000..2ecb14b --- /dev/null +++ b/.github/workflows/build-docker-airflow.yml @@ -0,0 +1,28 @@ +name: Build Docker - Airflow + +on: + push: + paths: + - 'docker/airflow/**' +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - name: Log in to Docker Hub + uses: docker/login-action@v1 + with: + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_PASSWORD }} + + - name: Docker Build & Push + run: | + timestamp=$(date +"%Y%m%d%H%M%S") + + docker build -t ${{ secrets.DOCKERHUB_USERNAME }}/airflow:$timestamp -f docker/airflow/Dockerfile ./docker/airflow + docker tag ${{ secrets.DOCKERHUB_USERNAME }}/airflow:$timestamp ${{ secrets.DOCKERHUB_USERNAME }}/airflow:latest + + docker push ${{ secrets.DOCKERHUB_USERNAME }}/airflow:$timestamp + docker push ${{ secrets.DOCKERHUB_USERNAME }}/airflow:latest + diff --git a/README.md b/README.md index 8167b11..140e4d3 100644 --- a/README.md +++ b/README.md @@ -79,6 +79,8 @@ DataPulse is a platform for big data and AI. It is based on Apache Spark and Kub - 3.5.0 - Delta Lake - 3.0.0 +- Airflow + - 2.1.0 ## License This project is licensed under the terms of the MIT license. diff --git a/dags/demo.py b/dags/demo.py new file mode 100755 index 0000000..0661cf3 --- /dev/null +++ b/dags/demo.py @@ -0,0 +1,25 @@ +from airflow import DAG +from airflow.operators.bash import BashOperator +from datetime import datetime + +default_args = { + 'owner': 'airflow', + 'start_date': datetime(2023, 1, 1), + 'catchup': True +} + +dag = DAG( + 'demo_dag', + default_args=default_args, + description='A simple DAG for demo', + schedule_interval='@daily', + catchup=False, +) + +run_script = BashOperator( + task_id='display_logs', + bash_command='python /opt/airflow/examples/airflow_demo.py', + dag=dag, +) + +run_script \ No newline at end of file diff --git a/dags/sg_resale_flat_prices.py b/dags/sg_resale_flat_prices.py new file mode 100644 index 0000000..02c97ab --- /dev/null +++ b/dags/sg_resale_flat_prices.py @@ -0,0 +1,30 @@ +from airflow import DAG +from airflow.providers.papermill.operators.papermill import PapermillOperator +from datetime import datetime, timedelta + +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=5), + 'start_date': datetime(2023, 1, 1), + 'catchup': True +} + +with DAG( + 'SG_Resale_Flat_Prices', + default_args=default_args, + description='DAG for analysis on Singapore resale flat prices', + schedule_interval=timedelta(days=1), + catchup=False, +) as dag: + + run_notebook = PapermillOperator( + task_id='sg_resale_flat_prices_notebook', + input_nb='/opt/airflow/examples/sg-resale-flat-prices/sg_resale_flat_prices.ipynb', + output_nb='/opt/airflow/examples/sg-resale-flat-prices/output/output-notebook-{{ execution_date }}.ipynb' + ) + +run_notebook \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml index 8c32d7b..9c56444 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -69,22 +69,40 @@ services: cpus: "1" memory: 1g - spark-worker-2: - image: wenyixu101/spark:latest - container_name: spark-worker-2 + # spark-worker-2: + # image: wenyixu101/spark:latest + # container_name: spark-worker-2 + # ports: + # - "8082:8082" + # environment: + # - SPARK_MASTER_HOST=spark-master + # - SPARK_MODE=worker + # - SPARK_WORKER_PORT=8082 + # - SPARK_WORKER_WEBUI_PORT=8082 + # - SPARK_HISTORY_OPTS=-Dspark.history.fs.logDirectory=/opt/data/spark-events + # command: "/opt/spark/bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077" + # volumes: + # - ./data/spark-events:/opt/data/spark-events + # - ./data/spark-warehouse:/opt/data/spark-warehouse + # - ./datasets:/opt/data/datasets + # pull_policy: always + # deploy: + # resources: + # limits: + # cpus: "1" + # memory: 1g + + history-server: + image: wenyixu101/history-server:latest + container_name: history-server ports: - - "8082:8082" + - "18080:18080" environment: - - SPARK_MASTER_HOST=spark-master - - SPARK_MODE=worker - - SPARK_WORKER_PORT=8082 - - SPARK_WORKER_WEBUI_PORT=8082 - - SPARK_HISTORY_OPTS=-Dspark.history.fs.logDirectory=/opt/data/spark-events - command: "/opt/spark/bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077" + - SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=file:/opt/data/spark-events" + - SPARK_MODE=history-server + command: "/opt/spark/bin/spark-class org.apache.spark.deploy.history.HistoryServer" volumes: - ./data/spark-events:/opt/data/spark-events - - ./data/spark-warehouse:/opt/data/spark-warehouse - - ./datasets:/opt/data/datasets pull_policy: always deploy: resources: @@ -92,21 +110,67 @@ services: cpus: "1" memory: 1g - history-server: - image: wenyixu101/history-server:latest - container_name: history-server + airflow-webserver: + image: wenyixu101/airflow:2.1.0 + container_name: airflow-webserver + environment: + - AIRFLOW__CORE__EXECUTOR=LocalExecutor + - AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@airflow-postgres/airflow + - AIRFLOW__CORE__FERNET_KEY= ports: - - "18080:18080" + - "8090:8080" + command: > + bash -c "airflow db upgrade && + airflow users create --username admin --firstname YourFirstName --lastname YourLastName --role Admin --email example@example.com --password yourpassword && + airflow webserver" + volumes: + - ./dags:/opt/airflow/dags + - ./examples:/opt/airflow/examples + pull_policy: always + depends_on: + - airflow-scheduler + deploy: + resources: + limits: + cpus: "1" + memory: 1g + + airflow-scheduler: + image: wenyixu101/airflow:2.1.0 + container_name: airflow-scheduler environment: - - SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=file:/opt/data/spark-events" - - SPARK_MODE=history-server - command: "/opt/spark/bin/spark-class org.apache.spark.deploy.history.HistoryServer" + - AIRFLOW__CORE__EXECUTOR=LocalExecutor + - AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@airflow-postgres/airflow + - AIRFLOW__CORE__FERNET_KEY= + command: scheduler volumes: - - ./data/spark-events:/opt/data/spark-events + - ./dags:/opt/airflow/dags + - ./examples:/opt/airflow/examples pull_policy: always + depends_on: + - airflow-postgres deploy: resources: limits: cpus: "1" memory: 1g + + postgres: + image: postgres:13 + container_name: postgres + environment: + - POSTGRES_USER=airflow + - POSTGRES_PASSWORD=airflow + - POSTGRES_DB=airflow + ports: + - "5432:5432" + volumes: + - ./data/postgres:/var/lib/postgresql/data + pull_policy: always + deploy: + resources: + limits: + cpus: "1" + memory: 1g + \ No newline at end of file diff --git a/docker/airflow/Dockerfile b/docker/airflow/Dockerfile new file mode 100644 index 0000000..79d8ddb --- /dev/null +++ b/docker/airflow/Dockerfile @@ -0,0 +1,3 @@ +FROM apache/airflow:2.1.0 + +RUN pip install apache-airflow-providers-papermill diff --git a/examples/airflow_demo.py b/examples/airflow_demo.py new file mode 100644 index 0000000..a3b331f --- /dev/null +++ b/examples/airflow_demo.py @@ -0,0 +1,11 @@ +import logging + +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +def main(): + logger.info("Starting task...") + logger.info("Task completed successfully!") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/examples/test.ipynb b/examples/demo.ipynb similarity index 100% rename from examples/test.ipynb rename to examples/demo.ipynb diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..e69de29