diff --git a/.github/workflows/build_apiserver.yaml b/.github/workflows/build_apiserver.yaml
new file mode 100644
index 0000000..3c709d9
--- /dev/null
+++ b/.github/workflows/build_apiserver.yaml
@@ -0,0 +1,52 @@
+name: Build API server container
+on:
+ push:
+ branches: [ main ]
+
+jobs:
+ build:
+ runs-on: ubuntu-latest
+ steps:
+ - name: Check Out Repo
+ uses: actions/checkout@v2
+
+ - name: Login to Docker Hub
+ uses: docker/login-action@v1
+ with:
+ username: ${{ secrets.DOCKER_HUB_USERNAME }}
+ password: ${{ secrets.DOCKER_HUB_ACCESS_TOKEN }}
+
+ - name: Set up Docker Buildx
+ id: buildx
+ uses: docker/setup-buildx-action@v1
+
+ - name: Build and push api-server
+ id: api-server
+ uses: docker/build-push-action@v2
+ with:
+ context: ./
+ file: ./Dockerfile.fastapi
+ push: true
+ tags: ${{ secrets.DOCKER_HUB_USERNAME }}/mlops-project:api-server-1.0
+
+ - name: Build and push prefect-worker
+ id: prefect-worker
+ uses: docker/build-push-action@v2
+ with:
+ context: ./
+ file: ./Dockerfile.prefect
+ push: true
+ tags: ${{ secrets.DOCKER_HUB_USERNAME }}/mlops-project:prefect-worker-1.0
+
+ - name: Image digest
+ run: echo ${{ steps.docker_build.outputs.digest }}
+
+ - name: Deploy
+ uses: appleboy/ssh-action@master
+ with:
+ host: ${{ secrets.REMOTE_IP }}
+ username: ${{ secrets.REMOTE_SSH_ID }}
+ port: ${{ secrets.REMOTE_SSH_PORT }}
+ key: ${{ secrets.REMOTE_SSH_KEY }}
+ script: |
+ kubectl rollout restart -f ./MLOps/k8s/prepi_deployments.yaml
diff --git a/.gitignore b/.gitignore
index 867b6d8..d9c1c5a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -4,3 +4,8 @@ __pycache__
tf_model/**/*
log.txt
experiments/**/temp/
+.ssl/
+prefect/atmos_tmp_pipeline/ray_mlflow
+prefect/atmos_tmp_pipeline/*.sh
+mlruns
+exp_models
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 0c2da5d..a86c82a 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -8,4 +8,5 @@ repos:
rev: 5.6.4
hooks:
- id: isort
- language_version: python3
\ No newline at end of file
+ language_version: python3
+ args: ["--profile", "black"]
\ No newline at end of file
diff --git a/Dockerfile.baseimage b/Dockerfile.baseimage
new file mode 100644
index 0000000..146c0a4
--- /dev/null
+++ b/Dockerfile.baseimage
@@ -0,0 +1,8 @@
+FROM python:3.8
+
+COPY requirements.txt /requirements.txt
+
+RUN pip install --upgrade pip &&\
+ pip install --no-cache-dir -r requirements.txt &&\
+ pip uninstall -y tensorflow==2.6 &&\
+ pip install --no-cache-dir tensorflow-cpu==2.4
\ No newline at end of file
diff --git a/Dockerfile.fastapi b/Dockerfile.fastapi
new file mode 100644
index 0000000..0c5ec49
--- /dev/null
+++ b/Dockerfile.fastapi
@@ -0,0 +1,7 @@
+FROM hl8469/mlops-project:base-image-1.0
+
+COPY . /
+
+EXPOSE 8000
+
+CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "3"]
\ No newline at end of file
diff --git a/Dockerfile.prefect b/Dockerfile.prefect
new file mode 100644
index 0000000..510768b
--- /dev/null
+++ b/Dockerfile.prefect
@@ -0,0 +1,8 @@
+FROM hl8469/mlops-project:base-image-1.0
+
+COPY ./prefect /prefect
+COPY ./set_prefect.sh /
+
+RUN prefect backend cloud
+
+CMD /set_prefect.sh
diff --git a/README.md b/README.md
index f70e538..9d2a3e2 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,140 @@
-# MLOps
-๐ Build MLOps system step by step ๐
+
MLOps project
+๐ ํ๋ก์ ํธ ๋ชฉํ ๐
+์ง์๊ฐ๋ฅํ AI ์๋น์ค๋ฅผ ์ํ MLOps System ๊ตฌ์ฑ
-## ๋ฌธ์
+[![](https://img.shields.io/static/v1?label=Python&message=3.8&color=0277bd&labelColor=gray&style=flat&logo=)](https://www.python.org/)
+[![](https://img.shields.io/static/v1?label=FastAPI&message=0.7&color=009485&labelColor=gray&style=flat&logo=)](https://fastapi.tiangolo.com/)
+[![](https://img.shields.io/static/v1?label=PostgreSQL&message=13.4&color=32668f&labelColor=gray&style=flat&logo=)](https://www.postgresql.org/)
+[![](https://img.shields.io/static/v1?label=Docker&message=20.10.11&color=2496ed&labelColor=gray&style=flat&logo=)](https://www.docker.com/)
+[![](https://img.shields.io/static/v1?label=Kubernetes&message=1.22.3&color=2e6ce6&labelColor=gray&style=flat&logo=)](https://kubernetes.io/)
+[![๋์ฑ์ด ๋ฐ๋ณด](https://img.shields.io/static/v1?label=MLflow&message=1.20.2&color=2496ed&labelColor=gray&style=flat&logo=)](https://mlflow.org/)
+[![](https://img.shields.io/static/v1?label=Prefect&message=0.15.6&color=27b1ff&labelColor=gray&style=flat&logo=)](https://docs.prefect.io/)
+[![](https://img.shields.io/static/v1?label=RAY&message=1.7.0&color=00a2e9&labelColor=gray&style=flat&logo=)](https://docs.ray.io/en/latest/)
-- [API DOCS](./docs/api-list.md)
\ No newline at end of file
+
+
+- [1. ํ๋ก์ ํธ ์๊ฐ](#1-ํ๋ก์ ํธ-์๊ฐ)
+- [2. ํ๋ก์ ํธ ์คํํด๋ณด๊ธฐ](#2-ํ๋ก์ ํธ-์คํํด๋ณด๊ธฐ)
+- [3. Phase2(2021.10.06 ~ 2021.11.13)](#3-phase220211006--20211113)
+ - [3.1. Phase2 ์ฃผ์๊ธฐ๋ฅ](#31-phase2-์ฃผ์๊ธฐ๋ฅ)
+ - [3.1.1. ์ ์ฒด ํ๋ฆ๋](#311-์ ์ฒด-ํ๋ฆ๋)
+ - [3.1.2. ์คํ๊ด๋ฆฌ ๋ฐ ๋ชจ๋ธํ์ต](#312-์คํ๊ด๋ฆฌ-๋ฐ-๋ชจ๋ธํ์ต)
+ - [3.1.3. Inference API](#313-inference-api)
+ - [3.2. Phase2 ํธ๋ฌ๋ธ์ํ
](#32-phase2-ํธ๋ฌ๋ธ์ํ
)
+- [4. Phase1(2021.08.28 ~ 2021.10.06)](#4-phase120210828--20211006)
+ - [4.1. Phase1 ์ฃผ์๊ธฐ๋ฅ](#41-phase1-์ฃผ์๊ธฐ๋ฅ)
+ - [4.2. Phase1 ํธ๋ฌ๋ธ์ํ
](#42-phase1-ํธ๋ฌ๋ธ์ํ
)
+
+
+
+
+# 1. ํ๋ก์ ํธ ์๊ฐ
+> ์ ํฌ ํ๋ก์ ํธ๋ ์ฌ๋ฌ Phase๋ก ์ด๋ฃจ์ด์ ธ ์์ต๋๋ค. ํ์ฌ ๋จ๋ฝ์์ ๊ฐ Phase์ ๋ํ์ฌ ๊ฐ๋ตํ ์ค๋ช
ํ์๊ณ , ๊ฐ ๋จ๊ณ์์ ๊ตฌํํ ๊ตฌ์ฒด์ ์ธ ๋ด์ฉ์ ๋ํ ์์ธํ ์ค๋ช
์ ์๋์ ๋ฐ๋ก ๊ธฐ์ ํ์์ต๋๋ค.
+
+
+
+**Phase 2**
+* Phase2์์๋ [MLOps level 1](https://cloud.google.com/architecture/mlops-continuous-delivery-and-automation-pipelines-in-machine-learning#mlops_level_1_ml_pipeline_automation)์ ๊ตฌํํ๊ธฐ ์ํด ๋
ธ๋ ฅํ์์ต๋๋ค.
+* Git Action๊ณผ Kubernetes๋ฅผ ์ด์ฉํ์ฌ CD/CT๋ฅผ ๋์
ํ์์ต๋๋ค.
+* hyperparameter tuning process๋ฅผ NNI์์ ray๋ฅผ ์ด์ฉํด ๊ด๋ฆฌํ๋ ๊ฒ์ผ๋ก ๋ณ๊ฒฝํ์์ต๋๋ค. ์ด๋ก์ธํ์ฌ ํ์ํ์ง ์๊ฒ๋ ์ฝ๋๋ค์ด ์๊ฒจ๋ฌ๊ณ ๋ชจ๋ deprecated๋ก ์ด๋ํ์์ต๋๋ค.
+* Workflow๊ด๋ฆฌ๋๊ตฌ๋ก prefect๋ฅผ ๋์
ํ์์ต๋๋ค.
+* predict API๋ฅผ ์ฒ์ ํธ์ถํ์์ ๋ ๋ชจ๋ธ์ ๋ก๋ฉํ๋ ์๊ฐ์ด ๊ธธ์ด์ ธ ํ๋ฒ ํธ์ถ๋ ๋ชจ๋ธ์ ์ผ์ ์๊ฐ ์บ์ฑํด๋๋๋ก ๋ณ๊ฒฝํ์์ต๋๋ค.
+* ํด๋ฌ์คํฐ ๋ชจ๋ํฐ๋ง์ ์ํด prometheus์ grafana๋ฅผ ๋์
ํ์์ต๋๋ค.
+* `2021.10.06 ~ 2021.11.13` ๊ธฐ๊ฐ๋์ ์งํ๋์์ผ๋ฉฐ ํ์ฌ ๊ฐ๋ฐ์ข
๋ฃ๋์์ต๋๋ค.
+
+
+
+**Phase 1**
+* Phase 1์์๋ [MLOps level 0](https://cloud.google.com/architecture/mlops-continuous-delivery-and-automation-pipelines-in-machine-learning#mlops_level_0_manual_process)๋ฅผ ๊ตฌํํ๊ธฐ ์ํด ๋
ธ๋ ฅํ์์ต๋๋ค.
+* NNI๋ฅผ ์ด์ฉํ์ฌ hyperparameter tuning์ ์งํํ๋ฉฐ ํ์ต๋ ๋ชจ๋ธ์ ๋ํ predict API์ ๋ชจ๋ธํ์ต ํ์ดํ๋ผ์ธ์ ์คํ์ํฌ ์ ์๋ train API๋ฅผ ์ ๊ณตํฉ๋๋ค.
+* `2021.08.28 ~ 2021.10.06` ๊ธฐ๊ฐ๋์ ์งํ๋์์ผ๋ฉฐ ํ์ฌ ๊ฐ๋ฐ์ข
๋ฃ๋์์ต๋๋ค.
+
+
+
+
+# 2. ํ๋ก์ ํธ ์คํํด๋ณด๊ธฐ
+
+[Phase2 ํ๋ก์ ํธ ์คํํด๋ณด๊ธฐ](./docs/phase2.md)
+[Phase1 ํ๋ก์ ํธ ์คํํด๋ณด๊ธฐ](./docs/phase1.md)
+
+
+
+
+# 3. Phase2(2021.10.06 ~ 2021.11.13)
+
+## 3.1. Phase2 ์ฃผ์๊ธฐ๋ฅ
+
+Phase2 ๋จ๊ณ์์๋ [MLOps level 1](https://cloud.google.com/architecture/mlops-continuous-delivery-and-automation-pipelines-in-machine-learning#mlops_level_1_ml_pipeline_automation)์ ๊ตฌํํ๊ธฐ ์ํ์ฌ ๋
ธ๋ ฅํ์์ต๋๋ค.
+
+
+
+### 3.1.1. ์ ์ฒด ํ๋ฆ๋
+![](./docs/img/phase2.png)
+
+
+
+### 3.1.2. ์คํ๊ด๋ฆฌ ๋ฐ ๋ชจ๋ธํ์ต
+**Model artifacts**
+- MLflow๋ฅผ ์ด์ฉํ์ฌ Google Cloud Storage์ ์ ์ฅํฉ๋๋ค.
+
+**์ฑ๋ฅํ๊ฐ์งํ, ์ ์, ๊ฐ ์คํ์์ ์ฌ์ฉ๋ ํ์ดํผํ๋ผ๋ฏธํฐ**
+- MLflow๋ฅผ ์ด์ฉํ์ฌ PostgreSQL์ ์ ์ฅํฉ๋๋ค.
+
+**AutoML**
+- Ray tune์ ์ด์ฉํ์ฌ ํ์ดํผํ๋ผ๋ฏธํฐ ์์นญ์ ํฉ๋๋ค.
+
+**workflow management**
+- ์ฒ์์ ํฌ๋ก ํญ์ ๊ณ ๋ คํ์์ง๋ง ์ต์ข
์ ์ผ๋ก [Prefect](https://github.com/PrefectHQ/prefect)๋ฅผ ์ฌ์ฉํ์์ต๋๋ค.
+- web UI๋ฅผ ์ ๊ณตํ์ฌ ์๋์ผ๋ก workflow๋ฅผ ์คํํ ์ ์์ผ๋ฉฐ ์คํ ๊ฒฐ๊ณผ๋ฅผ ์ฝ๊ฒ ํ์ธํ ์ ์์ต๋๋ค.
+- ์ฃผ๊ธฐ์ ์ธ ์คํ์ด ํ์ํ workflow๋ค์ ๋ฆฌ๋
์ค์ ํฌ๋ก ํญ๊ณผ ๋์ผํ ๋ฌธ๋ฒ์ผ๋ก ์ ์ํ ์ ์์ต๋๋ค.
+ex) 10 6 * * 1 โ ๋งค์ฃผ ์์์ผ 6์ 10๋ถ์ workflow ์คํ
+
+**cluster monitoring**
+- GCP์์ ์๋น์ค๋ฅผ ์ ๊ณตํ ๋น์ ํด๋ฌ์คํฐ์ ๊ฐ์ฉ์์ ๋ชจ๋ํฐ๋ง์ ์ํ์ฌ Prometheus์ Grafana๋ฅผ ๋์
ํ์์ต๋๋ค.
+- ๊ทธ๋ผํ๋์์ ์ ๊ณต์ค์ธ 13077, 315 ๋์๋ณด๋๋ฅผ ์ด์ฉํ์์ต๋๋ค.
+
+
+
+
+### 3.1.3. Inference API
+- MLํ์ดํ๋ผ์ธ์ ์ํด ์์ฑ๋ ๋ชจ๋ธ์ ์ด์ฉํ inference ๊ฒฐ๊ณผ๋ฅผ ์ ๊ณตํฉ๋๋ค.
+- API server๋ inference ์์ฒญ์ด ๋ค์ด์ค๋ฉด ํด๋น ๋ชจ๋ธ์ด ์บ์ฑ๋์ด์๋์ง ํ์ธํ ํ ์์ผ๋ฉด MLflow server์ ํต์ ํ์ฌ ๋ชจ๋ธ์ ๋ก๋ํ ํ ์์ฒญ์ฌํญ์ ์ฒ๋ฆฌํ์ฌ ๊ฒฐ๊ณผ๋ฅผ ๋ฐํํฉ๋๋ค.
+- [Fast API](https://github.com/tiangolo/fastapi)๋ฅผ ์ด์ฉํ์์ต๋๋ค.
+
+
+
+## 3.2. Phase2 ํธ๋ฌ๋ธ์ํ
+- Phase1์์๋ NNI๋ฅผ ์ฌ์ฉํ์์ง๋ง ๋ค์๊ณผ ๊ฐ์ ์ด์ ๋ก Ray Tune์ผ๋ก ๋ณ๊ฒฝํ์์ต๋๋ค. [ํด๊ฒฐํ ๋ฐฉ๋ฒ](./docs/phase2_trouble.md#NNI)
+- GCP๋ก ์๋ฒ๋ฅผ ์ด์ ํ ํ MLflow์์ ๋ชจ๋ธ์ ๋ก๋ํ๋ ์๊ฐ์ด ์ฝ 10์ด๋ก ๋งค์ฐ ๋ฆ์ด์ก์ต๋๋ค. [ํด๊ฒฐํ ๋ฐฉ๋ฒ](./docs/phase2_trouble.md#์ฐ๋ฆฌ๋์Redis๋ฅผ๋ฒ๋ ธ๋)
+
+
+
+
+
+
+# 4. Phase1(2021.08.28 ~ 2021.10.06)
+![](./docs/img/phase1.png)
+
+## 4.1. Phase1 ์ฃผ์๊ธฐ๋ฅ
+- Phase1์์๋ [MLOps level 0](https://cloud.google.com/architecture/)๋ฅผ ๊ตฌํํ๊ธฐ ์ํ์ฌ ๋
ธ๋ ฅํ์์ต๋๋ค.
+- train์ experiments ํด๋์ ๊ตฌ์ฑ๋์ด ์์ต๋๋ค.
+ - ๋ณธ ํ๋ก์ ํธ์์๋ ์ด๋์ ๋์ ์๋ํ๋ ๋ชจ์ต์ ๊ตฌํํ๊ธฐ ์ํด train์ apiํํ๋ก ์์ฒญํ ์ ์๊ฒ ๊ตฌ์ฑํ์์ต๋๋ค.
+ - train ์์ฒญ์๋ฐ๋ผ subprocess๋ก NNi๋ฅผ ์ด์ฉํ hyper parameter tuning์ ์งํํฉ๋๋ค.
+ - ๊ฐ ์คํ๊ฒฐ๊ณผ best๋ชจ๋ธ์ ํ์ฌ ์ ์ฅ๋ ๋ชจ๋ธ ์ฑ๋ฅ๊ณผ ๋น๊ตํ์ฌ db์ ์ง๋ ฌํ์์ผ ์ ์ฅํฉ๋๋ค.
+- predict๋ `app/api/router/predict.py` ์ ๊ตฌ์ฑ๋์ด ์์ต๋๋ค.
+ - prediction์์ฒญ์ ๋ฐ๋ผ ๊ฒฐ๊ณผ๋ฅผ ๋ฐํํฉ๋๋ค.
+ - temp์์ธก์ ๊ฒฝ์ฐ ์๋ฒ ์์์ ๋ชจ๋ธ์ ๋ก๋ํ์ฌ ๋ชจ๋ธ์ ๋งค๋ฒ ์ฝ์ด์ค์ง ์๋๋ก ํฉ๋๋ค.
+ - insurance์์ธก์ ๊ฒฝ์ฐ db์์ ๋งค๋ฒ ๋ชจ๋ธ์ ์ฝ์ด์ ์์ธก์ ์งํํฉ๋๋ค.
+- logger
+ - ์์ฒญ, ์คํ์งํ ๋ฑ์ log๋ก ๋จ๊น๋๋ค.
+
+## 4.2. Phase1 ํธ๋ฌ๋ธ์ํ
\ No newline at end of file
diff --git a/app/api/schemas.py b/app/api/data_class.py
similarity index 91%
rename from app/api/schemas.py
rename to app/api/data_class.py
index 73118ea..8c4429a 100644
--- a/app/api/schemas.py
+++ b/app/api/data_class.py
@@ -29,3 +29,7 @@ class ModelCorePrediction(BaseModel):
class ModelCore(ModelCoreBase):
class Config:
orm_mode = True
+
+
+class MnistData(BaseModel):
+ mnist_num: str
diff --git a/app/api/router/predict.py b/app/api/router/predict.py
index d2d5319..bf8b300 100644
--- a/app/api/router/predict.py
+++ b/app/api/router/predict.py
@@ -1,18 +1,34 @@
# -*- coding: utf-8 -*-
+import ast
+import asyncio
+import os
from typing import List
+import mlflow
import numpy as np
+import pandas as pd
+import torchvision.transforms as transforms
+import xgboost as xgb
+from dotenv import load_dotenv
from fastapi import APIRouter
from starlette.concurrency import run_in_threadpool
-from app import models
-from app.api.schemas import ModelCorePrediction
+from app import schema
+from app.api.data_class import MnistData, ModelCorePrediction
from app.database import engine
-from app.utils import ScikitLearnModel, my_model
+from app.query import SELECT_BEST_MODEL
+from app.utils import CachingModel, VarTimer, load_data, softmax
from logger import L
-models.Base.metadata.create_all(bind=engine)
+load_dotenv()
+schema.Base.metadata.create_all(bind=engine)
+
+host_url = os.getenv("MLFLOW_HOST")
+mlflow.set_tracking_uri(host_url)
+reset_sec = 5
+CLOUD_STORAGE_NAME = os.getenv("CLOUD_STORAGE_NAME")
+CLOUD_VALID_MNIST = os.getenv("CLOUD_VALID_MNIST")
router = APIRouter(
prefix="/predict",
@@ -21,66 +37,136 @@
)
-@router.put("/insurance")
-async def predict_insurance(info: ModelCorePrediction, model_name: str):
- """
- ์ ๋ณด๋ฅผ ์
๋ ฅ๋ฐ์ ๋ณดํ๋ฃ๋ฅผ ์์ธกํ์ฌ ๋ฐํํฉ๋๋ค.
-
- Args:
- info(dict): ๋ค์์ ๊ฐ๋ค์ ์
๋ ฅ๋ฐ์ต๋๋ค. age(int), sex(int), bmi(float), children(int), smoker(int), region(int)
-
- Returns:
- insurance_fee(float): ๋ณดํ๋ฃ ์์ธก๊ฐ์
๋๋ค.
- """
-
- def sync_call(info, model_name):
- """
- none sync ํจ์๋ฅผ sync๋ก ๋ง๋ค์ด ์ฃผ๊ธฐ ์ํ ํจ์์ด๋ฉฐ ์
์ถ๋ ฅ์ ๋ถ๋ชจ ํจ์์ ๊ฐ์ต๋๋ค.
- """
- model = ScikitLearnModel(model_name)
- model.load_model()
-
- info = info.dict()
- test_set = np.array([*info.values()]).reshape(1, -1)
-
- pred = model.predict_target(test_set)
- return {"result": pred.tolist()[0]}
+mnist_model = CachingModel("pytorch", 600)
+knn_model = CachingModel("sklearn", 600)
+data_lock = asyncio.Lock()
+train_df = VarTimer(600)
+
+
+@router.put("/mnist")
+async def predict_mnist(item: MnistData):
+ global train_df
+ global mnist_model, knn_model
+
+ item2 = np.array(ast.literal_eval(item.mnist_num)).astype(np.uint8)
+ model_name = "mnist"
+ model_name2 = "mnist_knn"
+ is_cloud = False
+ data_version = 1
+ exp_name = 'mnist'
+
+ if not isinstance(train_df._var, pd.DataFrame):
+ async with data_lock:
+ if not isinstance(train_df._var, pd.DataFrame):
+ df, _ = load_data(is_cloud, data_version, exp_name)
+ train_df.cache_var(df)
+
+ transform = transforms.Compose(
+ [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
+ )
+ reshaped_input = item2.reshape(28, 28)
+ transformed_input = transform(reshaped_input)
+ transformed_input = transformed_input.view(1, 1, 28, 28)
+
+ await mnist_model.get_model(model_name, load_type="production")
+ await knn_model.get_model(model_name2, load_type="production")
+
+ def sync_call(mnist_model, knn_model, train_df):
+ # Net1
+ result = mnist_model.predict(transformed_input)
+ p_res = softmax(result.detach().numpy()) * 100
+ percentage = np.around(p_res[0], 2).tolist()
+ # Net2
+ result = mnist_model.predict(transformed_input, True)
+ result = np.concatenate((result.detach().numpy(), np.array(percentage).reshape(1,-1) / 10), axis=1)
+ # KNN
+ knn_result = knn_model.predict(result)
+ xai_result = train_df.get_var().iloc[knn_result, 1:].values[0].tolist()
+ return {
+ "result": {
+ "percentage": percentage,
+ "answer": percentage.index(max(percentage)),
+ "xai_result": xai_result,
+ },
+ "error": None,
+ }
try:
- result = await run_in_threadpool(sync_call, info, model_name)
+ result = await run_in_threadpool(
+ sync_call, mnist_model, knn_model, train_df
+ )
L.info(
- f"Predict Args info: {info}\n\tmodel_name: {model_name}\n\tPrediction Result: {result}"
+ f"Predict Args info: {item.mnist_num}\n\tmodel_name: {model_name}\n\tPrediction Result: {result}\n\tcolor_avg_{result['result']['answer']}: {np.round(np.mean(item2), 2)}"
)
return result
-
except Exception as e:
L.error(e)
return {"result": "Can't predict", "error": str(e)}
-@router.put("/atmos")
-async def predict_temperature(time_series: List[float]):
+insurance_model = CachingModel("xgboost", 30)
+
+
+@router.put("/insurance")
+async def predict_insurance(info: ModelCorePrediction):
+ info = info.dict()
+ test_set = xgb.DMatrix(np.array([*info.values()]).reshape(1, -1))
+
+ model_name = "insurance"
+ await insurance_model.get_model(model_name, load_type="production")
+ result = insurance_model.predict(test_set)
+
+ result = float(result[0])
+ return {
+ "result": result,
+ "error": None,
+ }
+
+
+lock = asyncio.Lock()
+atmos_model_cache = VarTimer()
+
+
+@router.put("/atmos_temperature")
+async def predict_temperature_(time_series: List[float]):
"""
์จ๋ 1์๊ฐ ๊ฐ๊ฒฉ ์๊ณ์ด์ ์
๋ ฅ๋ฐ์ ์ดํ 24์๊ฐ ๋์์ ์จ๋๋ฅผ 1์๊ฐ ๊ฐ๊ฒฉ์ ์๊ณ์ด๋ก ์์ธกํฉ๋๋ค.
-
Args:
time_series(List): 72์๊ฐ ๋์์ 1์๊ฐ ๊ฐ๊ฒฉ ์จ๋ ์๊ณ์ด ์
๋๋ค. 72๊ฐ์ ์์๋ฅผ ๊ฐ์ ธ์ผ ํฉ๋๋ค.
-
Returns:
List[float]: ์
๋ ฅ๋ฐ์ ์๊ฐ ์ดํ 24์๊ฐ ๋์์ 1์๊ฐ ๊ฐ๊ฒฉ ์จ๋ ์์ธก ์๊ณ์ด ์
๋๋ค.
"""
+
+ global lock
+
if len(time_series) != 72:
L.error(f"input time_series: {time_series} is not valid")
return {"result": "time series must have 72 values", "error": None}
+ model_name = "atmos_tmp"
+
+ if not atmos_model_cache.is_var:
+ async with lock:
+ if not atmos_model_cache.is_var:
+ run_id = engine.execute(
+ SELECT_BEST_MODEL.format(model_name)
+ ).fetchone()[0]
+ print("start load model from mlflow")
+ atmos_model_cache.cache_var(
+ mlflow.keras.load_model(f"runs:/{run_id}/model")
+ )
+ print("end load model from mlflow")
+
def sync_pred_ts(time_series):
"""
none sync ํจ์๋ฅผ sync๋ก ๋ง๋ค์ด ์ฃผ๊ธฐ ์ํ ํจ์์ด๋ฉฐ ์
์ถ๋ ฅ์ ๋ถ๋ชจ ํจ์์ ๊ฐ์ต๋๋ค.
"""
- time_series = np.array(time_series).reshape(1, -1, 1)
- result = my_model.predict_target(time_series)
+
+ time_series = np.array(time_series).reshape(1, 72, 1)
+ result = atmos_model_cache.get_var().predict(time_series)
+ atmos_model_cache.reset_timer()
L.info(
- f"Predict Args info: {time_series.flatten().tolist()}\n\tmodel_name: {my_model.model_name}\n\tPrediction Result: {result.tolist()[0]}"
+ f"Predict Args info: {time_series.flatten().tolist()}\n\tmodel_name: {model_name}\n\tPrediction Result: {result.tolist()[0]}"
)
return {"result": result.tolist(), "error": None}
diff --git a/app/query.py b/app/query.py
index 744dd0a..ad50b45 100644
--- a/app/query.py
+++ b/app/query.py
@@ -1,102 +1,6 @@
-UPDATE_TEMP_MODEL_DATA = """
- DELETE FROM temp_model_data
- WHERE id NOT IN (
- SELECT id
- FROM temp_model_data
- WHERE experiment_name = '{}'
- ORDER BY {}
- LIMIT {}
- )
- """
-
-
-SELECT_TEMP_MODEL_BY_EXPR_NAME = """
- SELECT *
- FROM temp_model_data
- WHERE experiment_name = '{}'
- ORDER BY {};
- """
-
-
-SELECT_MODEL_METADATA_BY_EXPR_NAME = """
- SELECT *
- FROM model_metadata
- WHERE experiment_name = '{}'
- """
-
-INSERT_MODEL_CORE = """
- INSERT INTO model_core (
- model_name,
- model_file
- ) VALUES(
- '{}',
- '{}'
- )
- """
-
-INSERT_MODEL_METADATA = """
- INSERT INTO model_metadata (
- experiment_name,
- model_core_name,
- experimenter,
- version,
- train_mae,
- val_mae,
- train_mse,
- val_mse
- ) VALUES (
- '{}',
- '{}',
- '{}',
- '{}',
- '{}',
- '{}',
- '{}',
- '{}'
- )
+SELECT_BEST_MODEL = """
+ SELECT run_id
+ FROM best_model_data
+ WHERE model_name = '{}'
"""
-UPDATE_MODEL_CORE = """
- UPDATE model_core
- SET
- model_file = '{}'
- WHERE
- model_name = '{}'
- """
-
-UPDATE_MODEL_METADATA = """
- UPDATE model_metadata
- SET
- train_mae = {},
- val_mae = {},
- train_mse = {},
- val_mse = {}
- WHERE experiment_name = '{}'
- """
-
-DELETE_ALL_EXPERIMENTS_BY_EXPR_NAME = """
- DELETE FROM temp_model_data
- WHERE experiment_name = '{}'
-"""
-
-INSERT_OR_UPDATE_MODEL = """
-UPDATE model_core
-SET model_name='{mn}', model_file='{mf}'
-WHERE model_core.model_name='{mn}';
-INSERT INTO model_core (model_name, model_file)
-SELECT '{mn}', '{mf}'
-WHERE NOT EXISTS (SELECT 1
- FROM model_core as mc
- WHERE mc.model_name = '{mn}');
-"""
-
-INSERT_OR_UPDATE_SCORE = """
-UPDATE atmos_model_metadata
-SET mae='{score1}', mse='{score2}'
-WHERE atmos_model_metadata.model_name='{mn}';
-INSERT INTO atmos_model_metadata (model_name, experiment_id, mae, mse)
-SELECT '{mn}', '{expr_id}', '{score1}', '{score2}'
-WHERE NOT EXISTS (SELECT 1
- FROM atmos_model_metadata as amm
- WHERE amm.model_name = '{mn}');
-"""
diff --git a/app/schema.py b/app/schema.py
new file mode 100644
index 0000000..369e352
--- /dev/null
+++ b/app/schema.py
@@ -0,0 +1,27 @@
+# -*- coding: utf-8 -*-
+import datetime
+
+from sqlalchemy import FLOAT, Column, Integer, String
+
+from app.database import Base
+
+KST = datetime.timezone(datetime.timedelta(hours=9))
+
+
+class BestModelData(Base):
+ __tablename__ = "best_model_data"
+
+ model_name = Column(String, primary_key=True)
+ run_id = Column(String, nullable=False)
+ model_type = Column(String, nullable=False)
+ metric = Column(String, nullable=False)
+ metric_score = Column(FLOAT, nullable=False)
+
+
+class DataInfo(Base):
+ __tablename__ = "data_info"
+
+ path = Column(String, primary_key=True)
+ exp_name = Column(String)
+ version = Column(Integer)
+ data_from = Column(String)
diff --git a/app/utils/__init__.py b/app/utils/__init__.py
new file mode 100644
index 0000000..4b52211
--- /dev/null
+++ b/app/utils/__init__.py
@@ -0,0 +1,2 @@
+# from .utils import ScikitLearnModel, load_data_cloud, VarTimer, softmax
+from .utils import *
\ No newline at end of file
diff --git a/app/utils/utils.py b/app/utils/utils.py
new file mode 100644
index 0000000..6fdabd4
--- /dev/null
+++ b/app/utils/utils.py
@@ -0,0 +1,196 @@
+import asyncio
+import os
+import threading
+import time
+from io import StringIO
+
+import mlflow
+import numpy as np
+import pandas as pd
+import tensorflow as tf
+import torch
+from dotenv import load_dotenv
+from google.cloud import storage
+
+from app.database import engine
+from app.query import *
+from logger import L
+
+load_dotenv()
+
+os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
+
+base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+
+physical_devices = tf.config.list_physical_devices("GPU")
+if physical_devices:
+ tf.config.experimental.set_memory_growth(physical_devices[0], enable=True)
+
+
+def softmax(x):
+
+ f_x = np.exp(x) / np.sum(np.exp(x))
+ return f_x
+
+def get_data_path_from_db(data_version, exp_name):
+ select_query = """
+ SELECT *
+ FROM data_info
+ where version = {} and exp_name = '{}'
+ """
+ (train_path, _, _, _), (valid_path, _, _, _) = engine.execute(
+ select_query.format(data_version, exp_name)
+ ).fetchall()
+
+ return train_path, valid_path
+
+
+def load_data_cloud(bucket_name, version):
+ data_path, _ = get_data_path_from_db(version, 'mnist')
+ storage_client = storage.Client()
+ bucket = storage_client.bucket(bucket_name)
+ blob = bucket.blob(data_path)
+
+ bytes_data = blob.download_as_bytes()
+
+ s = str(bytes_data, "utf-8")
+
+ data = StringIO(s)
+ df = pd.read_csv(data)
+
+ return df
+
+def get_data_path_from_db(data_version, exp_name):
+ select_query = """
+ SELECT *
+ FROM data_info
+ where version = {} and exp_name = '{}'
+ """
+ (train_path, _, _, _), (valid_path, _, _, _) = engine.execute(
+ select_query.format(data_version, exp_name)
+ ).fetchall()
+
+ return train_path, valid_path
+
+def load_data(is_cloud, data_version, exp_name):
+
+ if is_cloud:
+ CLOUD_STORAGE_NAME = os.getenv("CLOUD_STORAGE_NAME")
+ train_path, valid_path = get_data_path_from_db(data_version, exp_name)
+ train_df = load_data_cloud(CLOUD_STORAGE_NAME, train_path)
+ valid_df = load_data_cloud(CLOUD_STORAGE_NAME, valid_path)
+ else:
+ TRAIN_MNIST = os.getenv("TRAIN_MNIST")
+ VALID_MNIST = os.getenv("VALID_MNIST")
+ train_df = pd.read_csv(TRAIN_MNIST)
+ valid_df = pd.read_csv(VALID_MNIST)
+
+ return train_df, valid_df
+
+
+class VarTimer:
+ def __init__(self, caching_time=5):
+ self._var = None
+ self._caching_time = caching_time
+ self._reset_flag = False
+
+ def cache_var(self, var, caching_time=False):
+ if caching_time:
+ self._change_timedelta(caching_time)
+ self._var = var
+ self._reset_flag = True
+ cleaner = threading.Thread(target=self._value_cleaner)
+ cleaner.start()
+
+ def _value_cleaner(self):
+ while self._reset_flag:
+ self._reset_flag = False
+ time.sleep(self._caching_time)
+ self._var = None
+
+ def get_var(self):
+ self._reset_flag = True
+ return self._var
+
+ def reset_timer(self, caching_time=False):
+ if caching_time:
+ self._change_timedelta(caching_time)
+ self._reset_flag = True
+
+ def _change_timedelta(self, caching_time):
+ if not (
+ isinstance(caching_time, int) | isinstance(caching_time, float)
+ ):
+
+ print(
+ (
+ f"timedelta must be int or float! "
+ f'"{caching_time}"(type {type(caching_time)}) isn\'t applied'
+ )
+ )
+ else:
+ self._caching_time = caching_time
+
+ @property
+ def is_var(self):
+ return True if self._var else False
+
+
+class CachingModel(VarTimer):
+ def __init__(self, model_type, caching_time=5):
+ super().__init__(caching_time)
+ self._run_id = None
+ self._model_type = model_type
+ self._lock = asyncio.Lock()
+ self._model_name = None
+ self._model_type_dict = {
+ "keras": mlflow.keras.load_model,
+ "pytorch": mlflow.pytorch.load_model,
+ "sklearn": mlflow.sklearn.load_model,
+ "xgboost": mlflow.xgboost.load_model,
+ }
+ self._model_load_way = {
+ "production": "models:/{}/Production",
+ "score": "runs:/{}/model",
+ }
+
+ def _load_run_id(self, model_name):
+ self._run_id = engine.execute(
+ SELECT_BEST_MODEL.format(model_name)
+ ).fetchone()[0]
+
+ def _load_model_mlflow(self, load_type):
+ _model_load_arg = {
+ "production": self._model_name,
+ "score": self._run_id,
+ }
+ model_uri = self._model_load_way[load_type].format(
+ _model_load_arg[load_type]
+ )
+ model = self._model_type_dict[self._model_type](model_uri)
+
+ return model
+
+ async def get_model(self, model_name, load_type="production"):
+ if not super().is_var:
+ async with self._lock:
+ if not super().is_var:
+ if load_type == "production":
+ self._model_name = model_name
+ else:
+ self._load_run_id(model_name)
+ super().cache_var(self._load_model_mlflow(load_type))
+ else:
+ super().reset_timer()
+
+ def predict(self, data, cut=False):
+ if self._model_type == "pytorch":
+ if cut:
+ return torch.nn.Sequential(
+ *list(self._var.children())[:-1]
+ ).forward(data)
+ else:
+ return self._var.forward(data)
+ else:
+ return self._var.predict(data)
+
diff --git a/deprecated/app/__init__.py b/deprecated/app/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/deprecated/app/api/__init__.py b/deprecated/app/api/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/deprecated/app/api/router/__init__.py b/deprecated/app/api/router/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/deprecated/app/api/router/predict.py b/deprecated/app/api/router/predict.py
new file mode 100644
index 0000000..8769c3c
--- /dev/null
+++ b/deprecated/app/api/router/predict.py
@@ -0,0 +1,60 @@
+# -*- coding: utf-8 -*-
+import numpy as np
+import torchvision.transforms as transforms
+from dotenv import load_dotenv
+from fastapi import APIRouter
+from starlette.concurrency import run_in_threadpool
+
+from app import schema
+from app.api.data_class import ModelCorePrediction
+from app.database import engine
+from app.utils import (
+ ScikitLearnModel,
+)
+from logger import L
+
+load_dotenv()
+
+schema.Base.metadata.create_all(bind=engine)
+
+router = APIRouter(
+ prefix="/predict",
+ tags=["predict"],
+ responses={404: {"description": "Not Found"}},
+)
+
+@router.put("/insurance")
+async def predict_insurance(info: ModelCorePrediction, model_name: str):
+ """
+ ์ ๋ณด๋ฅผ ์
๋ ฅ๋ฐ์ ๋ณดํ๋ฃ๋ฅผ ์์ธกํ์ฌ ๋ฐํํฉ๋๋ค.
+
+ Args:
+ info(dict): ๋ค์์ ๊ฐ๋ค์ ์
๋ ฅ๋ฐ์ต๋๋ค. age(int), sex(int), bmi(float), children(int), smoker(int), region(int)
+
+ Returns:
+ insurance_fee(float): ๋ณดํ๋ฃ ์์ธก๊ฐ์
๋๋ค.
+ """
+
+ def sync_call(info, model_name):
+ """
+ none sync ํจ์๋ฅผ sync๋ก ๋ง๋ค์ด ์ฃผ๊ธฐ ์ํ ํจ์์ด๋ฉฐ ์
์ถ๋ ฅ์ ๋ถ๋ชจ ํจ์์ ๊ฐ์ต๋๋ค.
+ """
+ model = ScikitLearnModel(model_name)
+ model.load_model()
+
+ info = info.dict()
+ test_set = np.array([*info.values()]).reshape(1, -1)
+
+ pred = model.predict_target(test_set)
+ return {"result": pred.tolist()[0]}
+
+ try:
+ result = await run_in_threadpool(sync_call, info, model_name)
+ L.info(
+ f"Predict Args info: {info}\n\tmodel_name: {model_name}\n\tPrediction Result: {result}"
+ )
+ return result
+
+ except Exception as e:
+ L.error(e)
+ return {"result": "Can't predict", "error": str(e)}
diff --git a/app/api/router/train.py b/deprecated/app/api/router/train.py
similarity index 100%
rename from app/api/router/train.py
rename to deprecated/app/api/router/train.py
diff --git a/deprecated/app/query.py b/deprecated/app/query.py
new file mode 100644
index 0000000..744dd0a
--- /dev/null
+++ b/deprecated/app/query.py
@@ -0,0 +1,102 @@
+UPDATE_TEMP_MODEL_DATA = """
+ DELETE FROM temp_model_data
+ WHERE id NOT IN (
+ SELECT id
+ FROM temp_model_data
+ WHERE experiment_name = '{}'
+ ORDER BY {}
+ LIMIT {}
+ )
+ """
+
+
+SELECT_TEMP_MODEL_BY_EXPR_NAME = """
+ SELECT *
+ FROM temp_model_data
+ WHERE experiment_name = '{}'
+ ORDER BY {};
+ """
+
+
+SELECT_MODEL_METADATA_BY_EXPR_NAME = """
+ SELECT *
+ FROM model_metadata
+ WHERE experiment_name = '{}'
+ """
+
+INSERT_MODEL_CORE = """
+ INSERT INTO model_core (
+ model_name,
+ model_file
+ ) VALUES(
+ '{}',
+ '{}'
+ )
+ """
+
+INSERT_MODEL_METADATA = """
+ INSERT INTO model_metadata (
+ experiment_name,
+ model_core_name,
+ experimenter,
+ version,
+ train_mae,
+ val_mae,
+ train_mse,
+ val_mse
+ ) VALUES (
+ '{}',
+ '{}',
+ '{}',
+ '{}',
+ '{}',
+ '{}',
+ '{}',
+ '{}'
+ )
+"""
+
+UPDATE_MODEL_CORE = """
+ UPDATE model_core
+ SET
+ model_file = '{}'
+ WHERE
+ model_name = '{}'
+ """
+
+UPDATE_MODEL_METADATA = """
+ UPDATE model_metadata
+ SET
+ train_mae = {},
+ val_mae = {},
+ train_mse = {},
+ val_mse = {}
+ WHERE experiment_name = '{}'
+ """
+
+DELETE_ALL_EXPERIMENTS_BY_EXPR_NAME = """
+ DELETE FROM temp_model_data
+ WHERE experiment_name = '{}'
+"""
+
+INSERT_OR_UPDATE_MODEL = """
+UPDATE model_core
+SET model_name='{mn}', model_file='{mf}'
+WHERE model_core.model_name='{mn}';
+INSERT INTO model_core (model_name, model_file)
+SELECT '{mn}', '{mf}'
+WHERE NOT EXISTS (SELECT 1
+ FROM model_core as mc
+ WHERE mc.model_name = '{mn}');
+"""
+
+INSERT_OR_UPDATE_SCORE = """
+UPDATE atmos_model_metadata
+SET mae='{score1}', mse='{score2}'
+WHERE atmos_model_metadata.model_name='{mn}';
+INSERT INTO atmos_model_metadata (model_name, experiment_id, mae, mse)
+SELECT '{mn}', '{expr_id}', '{score1}', '{score2}'
+WHERE NOT EXISTS (SELECT 1
+ FROM atmos_model_metadata as amm
+ WHERE amm.model_name = '{mn}');
+"""
diff --git a/app/models.py b/deprecated/app/schema.py
similarity index 100%
rename from app/models.py
rename to deprecated/app/schema.py
diff --git a/deprecated/app/utils/__init__.py b/deprecated/app/utils/__init__.py
new file mode 100644
index 0000000..4b52211
--- /dev/null
+++ b/deprecated/app/utils/__init__.py
@@ -0,0 +1,2 @@
+# from .utils import ScikitLearnModel, load_data_cloud, VarTimer, softmax
+from .utils import *
\ No newline at end of file
diff --git a/app/utils.py b/deprecated/app/utils/utils.py
similarity index 99%
rename from app/utils.py
rename to deprecated/app/utils/utils.py
index 340571e..a4f5da0 100644
--- a/app/utils.py
+++ b/deprecated/app/utils/utils.py
@@ -1,10 +1,8 @@
import codecs
import glob
import io
-import multiprocessing
import os
import pickle
-import re
import shutil
import socketserver
import subprocess
@@ -13,11 +11,14 @@
import tensorflow as tf
import yaml
+from dotenv import load_dotenv
from app.database import engine
from app.query import *
from logger import L
+load_dotenv()
+
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
@@ -119,10 +120,6 @@ def load_model(self):
self.model = tf.keras.models.load_model(model_path)
-my_model = TensorFlowModel("test_model")
-my_model.load_model()
-
-
def write_yml(path, experiment_name, experimenter, model_name, version):
"""
NNI ์คํ์ ์์ํ๊ธฐ ์ํ config.ymlํ์ผ์ ์์ฑํ๋ ํจ์ ์
๋๋ค.
@@ -538,4 +535,4 @@ def modelfile_cleaner(self):
"""
model_path = os.path.join(self.experiment_path, "temp", "*")
exprs = glob.glob(model_path)
- [shutil.rmtree(_) for _ in exprs]
+ [shutil.rmtree(_) for _ in exprs]
\ No newline at end of file
diff --git a/experiments/atmos_tmp_01/config.yml b/deprecated/experiments/atmos_tmp_01/config.yml
similarity index 100%
rename from experiments/atmos_tmp_01/config.yml
rename to deprecated/experiments/atmos_tmp_01/config.yml
diff --git a/experiments/atmos_tmp_01/preprocessing.py b/deprecated/experiments/atmos_tmp_01/preprocessing.py
similarity index 100%
rename from experiments/atmos_tmp_01/preprocessing.py
rename to deprecated/experiments/atmos_tmp_01/preprocessing.py
diff --git a/experiments/atmos_tmp_01/search_space.json b/deprecated/experiments/atmos_tmp_01/search_space.json
similarity index 100%
rename from experiments/atmos_tmp_01/search_space.json
rename to deprecated/experiments/atmos_tmp_01/search_space.json
diff --git a/experiments/atmos_tmp_01/train.py b/deprecated/experiments/atmos_tmp_01/train.py
similarity index 100%
rename from experiments/atmos_tmp_01/train.py
rename to deprecated/experiments/atmos_tmp_01/train.py
diff --git a/experiments/expr_db.py b/deprecated/experiments/expr_db.py
similarity index 100%
rename from experiments/expr_db.py
rename to deprecated/experiments/expr_db.py
diff --git a/experiments/insurance/config.yml b/deprecated/experiments/insurance/config.yml
similarity index 100%
rename from experiments/insurance/config.yml
rename to deprecated/experiments/insurance/config.yml
diff --git a/experiments/insurance/query.py b/deprecated/experiments/insurance/query.py
similarity index 100%
rename from experiments/insurance/query.py
rename to deprecated/experiments/insurance/query.py
diff --git a/experiments/insurance/search_space.json b/deprecated/experiments/insurance/search_space.json
similarity index 100%
rename from experiments/insurance/search_space.json
rename to deprecated/experiments/insurance/search_space.json
diff --git a/experiments/insurance/trial.py b/deprecated/experiments/insurance/trial.py
similarity index 100%
rename from experiments/insurance/trial.py
rename to deprecated/experiments/insurance/trial.py
diff --git a/docs/img/api-test.png b/docs/img/api-test.png
new file mode 100644
index 0000000..e648f24
Binary files /dev/null and b/docs/img/api-test.png differ
diff --git a/docs/img/nni.png b/docs/img/nni.png
new file mode 100644
index 0000000..d6b8c11
Binary files /dev/null and b/docs/img/nni.png differ
diff --git a/docs/img/phase1.png b/docs/img/phase1.png
new file mode 100644
index 0000000..846e213
Binary files /dev/null and b/docs/img/phase1.png differ
diff --git a/docs/img/phase2.png b/docs/img/phase2.png
new file mode 100644
index 0000000..e47a511
Binary files /dev/null and b/docs/img/phase2.png differ
diff --git a/docs/img/redis_pytorch_time.png b/docs/img/redis_pytorch_time.png
new file mode 100644
index 0000000..7526154
Binary files /dev/null and b/docs/img/redis_pytorch_time.png differ
diff --git a/docs/phase1.md b/docs/phase1.md
new file mode 100644
index 0000000..2712c12
--- /dev/null
+++ b/docs/phase1.md
@@ -0,0 +1,35 @@
+# Phase1 ์์ธ
+
+- [๋ฐํ์๋ฃ](https://docs.google.com/presentation/d/16cQSK4t3O86uMFg6iEr02MrtNUQf95RTcTleAAR44fY)
+
+## How to enter this project
+
+### data & env
+
+- postgres db ๋ฅผ ์ค๋นํฉ๋๋ค.
+ - `docker run -d --name postgre -p 5432:5432 -e POSTGRES_PASSWORD= postgres:13.4`
+- [Data](https://drive.google.com/file/d/1YPOPA1jnXFyJvl6ikThejvVnxOJl9ya5/view?usp=sharing)๋ฅผ ๋ค์ด๋ก๋ ๋ฐ์ postgresql db์ ๋ฃ์ด์ค๋๋ค.
+ - `docker cp ํด๋/๊ฒฝ๋ก/postgres_20211026.sql> :/postgres_20211026.sql`
+ - `docker exec -it bash`
+ - `psql postgres < /postgres_20211026.sql`
+ - ๋ง์ฝ ์ปจํ
์ด๋์์ role "root" does not exist ์๋ฌ๊ฐ ๋๋ค๋ฉด `su -l postgres` ๋ก ์ ์ ๋ฅผ ๋ณ๊ฒฝํ ํ์ ์์
ํด ์ฃผ์ธ์
+- enviornment variable ๋ฅผ .envํ์ผ์ ํฌํจ์์ผ ์ค๋๋ค.
+ ```plain
+ POSTGRES_USER=postgres
+ POSTGRES_PASSWORD=0000
+ POSTGRES_SERVER=localhost
+ POSTGRES_PORT=5432
+ POSTGRES_DB=postgres
+ ```
+
+### Project
+
+0. data&env ๋จ๊ณ๋ฅผ ์ํํฉ๋๋ค.
+1. [Phase1](https://github.com/State-of-The-MLOps/MLOps/releases/tag/v1.0.0) ์์ Source์ฝ๋๋ฅผ ๋ค์ด๋ฐ์ต๋๋ค.
+2. `conda create --name mlops-phase1 python=3.8`
+3. `conda activate mlops-phase1`
+4. `pip install -r requirements.txt` ๋ก ํ์ํ ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ์ค์นํฉ๋๋ค.
+5. `python main.py` ๋ก ์๋ฒ๋ฅผ ์คํ์ํต๋๋ค.
+6. http://localhost:8000/docs ์์ fastapi swagger๋ฅผ ํตํด api๋ฅผ ํ
์คํธํฉ๋๋ค.
+
+## Review
diff --git a/docs/phase2.md b/docs/phase2.md
new file mode 100644
index 0000000..e98d00a
--- /dev/null
+++ b/docs/phase2.md
@@ -0,0 +1,124 @@
+# Phase2 ์์ธ
+
+- [๋ฐํ์๋ฃ](https://docs.google.com/presentation/d/1TC_wMWykpN7QATgJnGuMVkwP_cm4PjEe3M3L57RuAfY/edit#slide=id.gf0d4a04c0e_2_75)
+
+# On premise
+
+## data & env
+
+- [๋งํฌ](https://drive.google.com/drive/folders/16BYXTck28c4Lvz8ps31atB8zfaBtHlW0?usp=sharing)์์ mnist data๋ฅผ ๋ค์ด๋ฐ์ ์ค๋นํฉ๋๋ค.
+- postgres๋ [phase1์ํ](phase1.md)์ ๊ฐ์ด ์ค๋น๋์ด์ผ ํฉ๋๋ค.
+- enviorment variable๋ฅผ .envํ์ผ์ ํฌํจ์์ผ ์ค๋๋ค.
+```plain
+POSTGRES_USER=postgres
+POSTGRES_PASSWORD=0000
+POSTGRES_SERVER=localhost
+POSTGRES_PORT=5431
+POSTGRES_DB=postgres
+MLFLOW_HOST=http://localhost:5000
+TRAIN_MNIST=/์ ๋/๊ฒฝ๋ก/mnist_train.csv
+VALID_MNIST=/์ ๋/๊ฒฝ๋ก/mnist_valid.csv
+```
+
+## project
+
+0. data&env ๋จ๊ณ๋ฅผ ์ํํฉ๋๋ค.
+1. ์์ค์ฝ๋๋ฅผ ๋ค์ด๋ฐ์ต๋๋ค.
+2. `conda create --name mlops-phase1 python=3.8`
+3. `conda activate mlops-phase1`
+4. `bash requirements.sh`๋ก ํ์ํ ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ์ค์นํฉ๋๋ค.
+ * requirements.txt๋ก ์ค์นํด๋ณธ ๊ฒฐ๊ณผ tfdv๋ฌธ์ ๋๋ฌธ์ ์ค์น๊ฐ ์ํํ์ง ์์ต๋๋ค.
+ * ์ค์น๊ฐ ์ํํ์ง ์๋ค๋ฉด shํ์ผ์ ๋ช
๋ น์ด๋ฅผ ๋ณต์ฌํด์ ์ปค๋งจ๋ ์ฐฝ์์ ์ค์นํด์ค๋๋ค. (ex ์๋์ฐํ๊ฒฝ)
+5. fast api server๋ฅผ ์คํ์ํต๋๋ค.
+ * `python main.py`
+6. mlflow server๋ฅผ ์คํ์ํต๋๋ค
+ * `mlflow server --backend-store-uri postgresql://:@localhost:5432/postgres --default-artifact-root <์ ์ฅ ๊ฒฝ๋ก>`
+7. prefect๋ฅผ ์คํํด ์ค๋๋ค.
+ 1. `python prefect/mnist/main.py` : mnist pipeline ์ถ๊ฐ
+ 2. `prefect agent local start`
+ - prefect ์คํ์ mnist์ is_cloud ํ๋ผ๋ฏธํฐ๋ฅผ False๋ก ๋ณ๊ฒฝํด์ค๋๋ค.
+
+# k8s
+
+## data & env
+
+### data
+
+- data ์ค๋น
+
+ google cloud storage๋ฅผ ์ฐ์ง ์์ ๊ฒฝ์ฐ
+
+
+ ```python
+ import gdown
+
+ google_path = 'https://drive.google.com/uc?id='
+ file_id = '115LZXgZA6gPQvf5FPI1b0nsnhNz5mzH0'
+ output_name = 'data_mnist_train.csv'
+ gdown.download(google_path+file_id,output_name,quiet=False)
+ google_path = 'https://drive.google.com/uc?id='
+ file_id = '1ExfRt-4YfbP8gOAXfudlR6Lt7PbPhJzs'
+ output_name = 'data_mnist_valid.csv'
+ gdown.download(google_path+file_id,output_name,quiet=False)
+ ```
+
+
+
+
+
+
+ google cloud storage๋ฅผ ์ฌ์ฉํ ๊ฒฝ์ฐ
+
+
+ ```python
+ def insert_info():
+ insert_q = """
+ INSERT INTO data_info (
+ path,
+ exp_name,
+ version,
+ data_from
+ ) VALUES (
+ '{}',
+ '{}',
+ {},
+ '{}'
+ )
+ """
+
+ engine.execute(insert_q.format(
+ 'data/mnist_train.csv',
+ 'mnist',
+ 1,
+ 'mnist_company'
+ ))
+ engine.execute(insert_q.format(
+ 'data/mnist_valid.csv',
+ 'mnist',
+ 1,
+ 'mnist_company'
+ ))
+
+ insert_info()
+ ```
+
+ - google cloud storage์ choonsik-storage ์ด๋ฆ์ผ๋ก bucket์์ฑ (๋ค๋ฅธ์ด๋ฆ์ผ ๊ฒฝ์ฐ configmap.yaml ์์ ํ์)
+ - dataํด๋ ์๋์ ๋ฐ์ดํฐ ์ ์ฅ (`configmap` : CLOUD_TRAIN_MNIST: data/mnist_train.csv)
+ - db์ cloud storage์ ์๋ data์ ๋ํ ์ ๋ณด ๊ธฐ๋ก
+
+
+
+### kubernetes secret
+
+- atmos-api-key : ์จ๋์ ๋ณด๋ฅผ ๋ฐ์์ค๋ api key
+- prefect-config : [๋งํฌ](https://cloud.prefect.io/user/keys) ์์ key ๋ฐ๊ธํ ~/.prefect/config.toml ์ ๊ธฐ๋ก ([์ฐธ๊ณ ](https://docs.prefect.io/orchestration/concepts/api_keys.html#using-api-keys))
+- psql-passwd : postgresql password
+- [๋งํฌ](https://cloud.google.com/docs/authentication/getting-started)๋ฅผ ์ฐธ๊ณ ํ์ฌ service-account-file์ ๋ฐ๊ธ๋ฐ๊ณ k8s secret์ผ๋ก ๊ด๋ฆฌ
+
+## Project
+
+0. data&env ๋จ๊ณ๋ฅผ ์ํํฉ๋๋ค.
+1. `cd k8s && kubectl apply -k kustomization.yaml`
+
+์ฐธ๊ณ : [frontend](https://github.com/ehddnr301/mnist_test_FE)
+# Review
diff --git a/docs/phase2_trouble.md b/docs/phase2_trouble.md
new file mode 100644
index 0000000..7228514
--- /dev/null
+++ b/docs/phase2_trouble.md
@@ -0,0 +1,42 @@
+# ์ฝ์ง๋ก
+
+## NNI vs Ray ์ฃผ๊ด๋น๊ต
+
+### NNI
+
+
+
+- ํ์ดํผ ํ๋ผ๋ฏธํฐ ์์นญ์ ์ํด ์ ์ผ ์ฒ์ ์ฌ์ฉํ NNi ์
๋๋ค.
+- ์ฅ์
+ - ํ์ผํํ๋ก ์์นญ์ ํ์ํ config.yaml๊ณผ search_space.json ํ์ผ์ ์์ฑํ๊ธฐ๋๋ฌธ์ ์ฝ์ต๋๋ค.
+ - ๋ณ๋ค๋ฅธ ์์
์์ด๋ ์๋์ ์ผ๋ก dashboard๋ฅผ ์ ๊ณตํฉ๋๋ค.
+- ๋จ์ (์ฅ์ ์ด์๋ ๋ถ๋ถ๋ค์ด ๋จ์ ์ด ๋์์ต๋๋ค.)
+ - python code๋ก๋ง ๊ด๋ฆฌ๋์์ผ๋ฉด ํ๋๋ฐ ๋ถํ์ํ ํ์ผ์์ฑ์ด ํ์ํ์ต๋๋ค.
+ - webui๋ฅผ ์คํ ์คํ๋ ์๋์ผ๋ก ๋์ฐ๋๋ฐ ์ด๋ฅผ ๋ฐฐ์ ํ๋ ์ต์
์ด ์กด์ฌํ์ง ์์์ต๋๋ค.
+
+
+
+
+### NNI
+
+
+
+- ํ์ดํผ ํ๋ผ๋ฏธํฐ ์์นญ์ ์ํด ์ ์ผ ์ฒ์ ์ฌ์ฉํ NNi ์
๋๋ค.
+- ์ฅ์
+ - ํ์ผํํ๋ก ์์นญ์ ํ์ํ config.yaml๊ณผ search_space.json ํ์ผ์ ์์ฑํ๊ธฐ๋๋ฌธ์ ์ฝ์ต๋๋ค.
+ - ๋ณ๋ค๋ฅธ ์์
์์ด๋ ์๋์ ์ผ๋ก dashboard๋ฅผ ์ ๊ณตํฉ๋๋ค.
+- ๋จ์ (์ฅ์ ์ด์๋ ๋ถ๋ถ๋ค์ด ๋จ์ ์ด ๋์์ต๋๋ค.)
+ - python code๋ก๋ง ๊ด๋ฆฌ๋์์ผ๋ฉด ํ๋๋ฐ ๋ถํ์ํ ํ์ผ์์ฑ์ด ํ์ํ์ต๋๋ค.
+ - webui๋ฅผ ์คํ ์คํ๋ ์๋์ผ๋ก ๋์ฐ๋๋ฐ ์ด๋ฅผ ๋ฐฐ์ ํ๋ ์ต์
์ด ์กด์ฌํ์ง ์์์ต๋๋ค.
+
+
+
+
+
+## ์ฐ๋ฆฌ๋ ์ Redis๋ฅผ ๋ฒ๋ ธ๋
+![](./img/api-test.png)
+
+
+- ์๋นํ ์์ ๋ชจ๋ธ์์๋ ๋ถ๊ตฌํ๊ณ redis์ ๋ฃ๊ธฐ์ serializeํ๋ ์์
๊ณผ
+- redis์์ ๊บผ๋ด์ deserializeํ๋ ์์
์ด ์ค๋ ๊ฑธ๋ฆฝ๋๋ค.
+- ์ด๋ฅผ ํด๊ฒฐํ๊ธฐ ์ํด ๋ฉ๋ชจ๋ฆฌ์ ๋ชจ๋ธ์ ๊ทธ๋๋ก ์ฌ๋ฆฌ๊ณ ์ผ์ ์๊ฐ์ด ์ง๋๋ฉด ์ง์์ฃผ๋ ์ฝ๋๋ฅผ ์ฌ์ฉํ๊ฒ ๋์์ต๋๋ค.
diff --git a/k8s/configmap.yaml b/k8s/configmap.yaml
new file mode 100644
index 0000000..e75a688
--- /dev/null
+++ b/k8s/configmap.yaml
@@ -0,0 +1,14 @@
+apiVersion: v1
+kind: ConfigMap
+metadata:
+ name: config-map
+data:
+ POSTGRES_USER: postgres
+ POSTGRES_SERVER: pgdb
+ POSTGRES_PORT: "5432"
+ POSTGRES_DB: postgres
+ MLFLOW_HOST: http://mlflow-service:5000
+ GOOGLE_APPLICATION_CREDENTIALS: /secret/service-account-file.json
+ CLOUD_STORAGE_NAME: choonsik-storage
+ CLOUD_TRAIN_MNIST: data/mnist_train.csv
+ CLOUD_VALID_MNIST: data/mnist_valid.csv
diff --git a/k8s/deployments.yaml b/k8s/deployments.yaml
new file mode 100644
index 0000000..a9c7f95
--- /dev/null
+++ b/k8s/deployments.yaml
@@ -0,0 +1,156 @@
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: postgresql-app
+spec:
+ replicas: 1
+ selector:
+ matchLabels:
+ app: postgres-server
+ template:
+ metadata:
+ name: postgresql
+ labels:
+ app: postgres-server
+ spec:
+ containers:
+ - name: postgresql
+ image: hl8469/mlops-project:postgre
+ imagePullPolicy: "Always"
+ volumeMounts:
+ - mountPath: /var/lib/postgresql/data
+ name: task-pv-storage
+ ports:
+ - containerPort: 5432
+ protocol: TCP
+ volumes:
+ - name: task-pv-storage
+ persistentVolumeClaim:
+ claimName: task-pv-claim
+---
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: mlflow-app
+spec:
+ replicas: 1
+ selector:
+ matchLabels:
+ app: mlflow-server
+ template:
+ metadata:
+ name: mlflow
+ labels:
+ app: mlflow-server
+ spec:
+ containers:
+ - name: mlflow
+ image: hl8469/mlops-project:mlflow-server-1.0
+ imagePullPolicy: "Always"
+ ports:
+ - containerPort: 5000
+ protocol: TCP
+---
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: fastapi-app
+spec:
+ replicas: 1
+ selector:
+ matchLabels:
+ app: api-server
+ template:
+ metadata:
+ name: fastapi
+ labels:
+ app: api-server
+ spec:
+ containers:
+ - name: fastapi-backend
+ image: hl8469/mlops-project:api-server-1.0
+ imagePullPolicy: "Always"
+ envFrom:
+ - secretRef:
+ name: psql-passwd
+ - configMapRef:
+ name: config-map
+ volumeMounts:
+ - name: gcs
+ mountPath: /secret/service-account-file.json
+ subPath: service-account-file.json
+ readOnly: true
+ ports:
+ - containerPort: 8000
+ protocol: TCP
+
+ volumes:
+ - name: gcs
+ secret:
+ secretName: service-account-file
+---
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: prefect-app
+spec:
+ replicas: 1
+ selector:
+ matchLabels:
+ app: prefect-service
+ template:
+ metadata:
+ name: prefect
+ labels:
+ app: prefect-service
+ spec:
+ containers:
+ - name: prefect-worker
+ image: hl8469/mlops-project:prefect-worker-1.0
+ imagePullPolicy: "Always"
+ envFrom:
+ - secretRef:
+ name: psql-passwd
+ - secretRef:
+ name: atmos-api-key
+ - configMapRef:
+ name: config-map
+ volumeMounts:
+ - name: gcs
+ mountPath: /secret/service-account-file.json
+ subPath: service-account-file.json
+ readOnly: true
+ - name: prefect-cfg
+ mountPath: /root/.prefect/config.toml
+ subPath: config.toml
+
+ volumes:
+ - name: gcs
+ secret:
+ secretName: service-account-file
+ - name: prefect-cfg
+ secret:
+ secretName: prefect-config
+---
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: front
+spec:
+ replicas: 1
+ selector:
+ matchLabels:
+ app: front-app
+ template:
+ metadata:
+ name: front
+ labels:
+ app: front-app
+ spec:
+ containers:
+ - name: front-worker
+ image: ehddnr/mnist_front:1.0.0
+ imagePullPolicy: "Always"
+ ports:
+ - containerPort: 3000
+ protocol: TCP
diff --git a/k8s/kustomization.yaml b/k8s/kustomization.yaml
new file mode 100644
index 0000000..11277e7
--- /dev/null
+++ b/k8s/kustomization.yaml
@@ -0,0 +1,7 @@
+apiVersion: kustomize.config.k8s.io/v1beta1
+kind: Kustomization
+resources:
+ - service.yaml
+ - deployments.yaml
+ - configmap.yaml
+ - pv-pvc.yaml
diff --git a/k8s/prepi_deployments.yaml b/k8s/prepi_deployments.yaml
new file mode 100644
index 0000000..c0fa5b5
--- /dev/null
+++ b/k8s/prepi_deployments.yaml
@@ -0,0 +1,82 @@
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: fastapi-app
+spec:
+ replicas: 1
+ selector:
+ matchLabels:
+ app: api-server
+ template:
+ metadata:
+ name: fastapi
+ labels:
+ app: api-server
+ spec:
+ containers:
+ - name: fastapi-backend
+ image: hl8469/mlops-project:api-server-1.0
+ imagePullPolicy: "Always"
+ envFrom:
+ - secretRef:
+ name: psql-passwd
+ - configMapRef:
+ name: config-map
+ volumeMounts:
+ - name: gcs
+ mountPath: /secret/service-account-file.json
+ subPath: service-account-file.json
+ readOnly: true
+ ports:
+ - containerPort: 8000
+ protocol: TCP
+
+ volumes:
+ - name: gcs
+ secret:
+ secretName: service-account-file
+---
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: prefect-app
+spec:
+ replicas: 1
+ selector:
+ matchLabels:
+ app: prefect-service
+ template:
+ metadata:
+ name: prefect
+ labels:
+ app: prefect-service
+ spec:
+ containers:
+ - name: prefect-worker
+ image: hl8469/mlops-project:prefect-worker-1.0
+ imagePullPolicy: "Always"
+ envFrom:
+ - secretRef:
+ name: psql-passwd
+ - secretRef:
+ name: atmos-api-key
+ - configMapRef:
+ name: config-map
+ volumeMounts:
+ - name: gcs
+ mountPath: /secret/service-account-file.json
+ subPath: service-account-file.json
+ readOnly: true
+ - name: prefect-cfg
+ mountPath: /root/.prefect/config.toml
+ subPath: config.toml
+
+ volumes:
+ - name: gcs
+ secret:
+ secretName: service-account-file
+ - name: prefect-cfg
+ secret:
+ secretName: prefect-config
+ nodeSelector:
+ kubernetes.io/hostname: cd-test-3
\ No newline at end of file
diff --git a/k8s/pv-pvc.yaml b/k8s/pv-pvc.yaml
new file mode 100644
index 0000000..8296d2a
--- /dev/null
+++ b/k8s/pv-pvc.yaml
@@ -0,0 +1,24 @@
+apiVersion: v1
+kind: PersistentVolume
+metadata:
+ name: task-pv-volume
+spec:
+ storageClassName: postgres-data
+ capacity:
+ storage: 3Gi
+ accessModes:
+ - ReadWriteMany
+ hostPath:
+ path: /home/choonsik/pg_data
+---
+apiVersion: v1
+kind: PersistentVolumeClaim
+metadata:
+ name: task-pv-claim
+spec:
+ storageClassName: postgres-data
+ accessModes:
+ - ReadWriteMany
+ resources:
+ requests:
+ storage: 3Gi
\ No newline at end of file
diff --git a/k8s/service.yaml b/k8s/service.yaml
new file mode 100644
index 0000000..6368107
--- /dev/null
+++ b/k8s/service.yaml
@@ -0,0 +1,54 @@
+apiVersion: v1
+kind: Service
+metadata:
+ name: mlops-service
+spec:
+ ports:
+ - name: web-port
+ port: 8000
+ targetPort: 8000
+ nodePort: 32105
+ selector:
+ app: api-server
+ type: NodePort
+---
+apiVersion: v1
+kind: Service
+metadata:
+ name: mlflow-service
+spec:
+ ports:
+ - name: mlflow-port
+ port: 5000
+ targetPort: 5000
+ nodePort: 32205
+ selector:
+ app: mlflow-server
+ type: NodePort
+---
+apiVersion: v1
+kind: Service
+metadata:
+ name: pgdb
+spec:
+ ports:
+ - name: db-port
+ port: 5432
+ targetPort: 5432
+ selector:
+ app: postgres-server
+ type: ClusterIP
+---
+apiVersion: v1
+kind: Service
+metadata:
+ name: front-service
+spec:
+ ports:
+ - name: front-port
+ port: 3000
+ targetPort: 3000
+ nodePort: 32222
+ selector:
+ app: front-app
+ type: NodePort
\ No newline at end of file
diff --git a/main.py b/main.py
index d0f3693..3d479f1 100644
--- a/main.py
+++ b/main.py
@@ -2,7 +2,7 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
-from app.api.router import predict, train
+from app.api.router import predict
app = FastAPI()
@@ -17,12 +17,11 @@
)
app.include_router(predict.router)
-app.include_router(train.router)
@app.get("/")
def hello_world():
- return {"message": "Hello World"}
+ return {"message": "Hi I'm choonsik!"}
if __name__ == "__main__":
diff --git a/prefect/atmos_tmp_pipeline/main.py b/prefect/atmos_tmp_pipeline/main.py
new file mode 100644
index 0000000..51769b3
--- /dev/null
+++ b/prefect/atmos_tmp_pipeline/main.py
@@ -0,0 +1,8 @@
+from pipeline import atmos_ETL
+from prefect import Client
+
+
+if __name__ == '__main__':
+ Client().create_project(project_name='atmos_test')
+ Pipeline = atmos_ETL("atmos_test", "atmos_mlrt", "0 */6 * * *")
+ Pipeline.create_flow()
\ No newline at end of file
diff --git a/prefect/atmos_tmp_pipeline/pipeline.py b/prefect/atmos_tmp_pipeline/pipeline.py
new file mode 100644
index 0000000..fa2a728
--- /dev/null
+++ b/prefect/atmos_tmp_pipeline/pipeline.py
@@ -0,0 +1,83 @@
+import prefect
+from prefect import Flow
+from task import *
+from prefect.schedules import Schedule
+from prefect.schedules.clocks import CronClock
+from prefect.run_configs import LocalRun
+
+
+class atmos_ETL:
+ _project_name = None
+ _flow_name = None
+ _logger = None
+ _flow = None
+
+
+ def __init__(self, project_name, flow_name, schedule=None):
+ self._logger = prefect.context.get("logger")
+ self._project_name = project_name
+ self._flow_name = flow_name
+ self._schedule = schedule
+
+ def create_flow(self):
+ self._logger.info(f"Create {self._flow_name} flow")
+
+ with Flow(self._flow_name) as flow:
+ self._logger.info('start data extract')
+
+ host_url = os.getenv('MLFLOW_HOST')
+ exp_name = "atmos_tmp"
+ metric = "mae"
+ model_type = "tensorflow"
+ num_trials = 10
+
+ extr_result = data_extract(os.getenv('ATMOS_API_KEY'))
+ valid_result = data_validation(extr_result[1])
+ load_data = data_load_to_db(valid_result[1],
+ os.getenv("POSTGRES_USER"),
+ os.getenv("POSTGRES_SERVER"),
+ os.getenv("POSTGRES_PASSWORD"))
+ is_end = train_mlflow_ray(load_data,
+ host_url,
+ exp_name,
+ metric,
+ num_trials)
+ log_best_model(is_end, host_url, exp_name, metric, model_type)
+
+ flow.run_config = LocalRun(working_dir="prefect/atmos_tmp_pipeline")
+ self._flow = flow
+ self._register()
+
+
+ def _register(self):
+ self._logger.info(
+ f"Regist {self._flow_name} flow to {self._project_name} project"
+ )
+ self._logger.info(f"Set Cron {self._schedule}")
+
+ if self._schedule:
+ self._set_cron()
+
+ self._flow.register(
+ project_name=self._project_name,
+ idempotency_key=self.flow.serialized_hash()
+ )
+
+
+ def _set_cron(self):
+ schedule = Schedule(clocks=[CronClock(self._schedule)])
+ self._flow.schedule = schedule
+
+
+ @property
+ def flow(self):
+ return self._flow
+
+ @property
+ def project_name(self):
+ return self._project_name
+
+ @property
+ def flow_name(self):
+ return self._flow_name
+
diff --git a/prefect/atmos_tmp_pipeline/query.py b/prefect/atmos_tmp_pipeline/query.py
new file mode 100644
index 0000000..8322b5e
--- /dev/null
+++ b/prefect/atmos_tmp_pipeline/query.py
@@ -0,0 +1,32 @@
+INSERT_BEST_MODEL = """
+ INSERT INTO best_model_data (
+ model_name,
+ run_id,
+ model_type,
+ metric,
+ metric_score
+ ) VALUES (
+ '{}',
+ '{}',
+ '{}',
+ '{}',
+ {}
+ )
+ """
+
+SELECT_EXIST_MODEL = """
+ SELECT *
+ FROM best_model_data
+ WHERE model_name = '{}'
+"""
+
+UPDATE_BEST_MODEL = """
+ UPDATE best_model_data
+ SET
+ run_id = '{}',
+ model_type = '{}',
+ metric = '{}',
+ metric_score = {}
+ WHERE
+ model_name = '{}'
+"""
diff --git a/prefect/atmos_tmp_pipeline/task.py b/prefect/atmos_tmp_pipeline/task.py
new file mode 100644
index 0000000..b94f8f5
--- /dev/null
+++ b/prefect/atmos_tmp_pipeline/task.py
@@ -0,0 +1,193 @@
+import requests
+import os
+from utils import *
+import sqlalchemy
+import pandas as pd
+from datetime import timedelta
+import tensorflow_data_validation as tfdv
+import prefect
+from prefect import task
+from prefect.tasks.prefect.flow_run_cancel import CancelFlowRun
+import mlflow
+
+
+@task
+def data_extract(api_key):
+ logger = prefect.context.get("logger")
+
+ timenow = pd.Timestamp.utcnow()\
+ .tz_convert("Asia/Seoul")\
+ .strftime("%Y-%m-%d %H:%M")
+ logger.info(f"{timenow}(KST): start data ETL process")
+
+ start_date = get_st_date()
+ start_date = pd.to_datetime(start_date) + timedelta(hours=1)
+ end_date = pd.Timestamp.utcnow() - timedelta(hours=15)
+ endpoint = "http://apis.data.go.kr/1360000/AsosHourlyInfoService/getWthrDataList"
+
+ stt_date_str = start_date.strftime('%Y-%m-%d')
+ end_date_str = end_date.strftime('%Y-%m-%d')
+
+ if start_date > pd.to_datetime(end_date_str + " 23:00"):
+ up_to_date = 'data is already up to date'
+ logger.info(up_to_date)
+ CFR = CancelFlowRun()
+ CFR.run()
+ return False
+
+ date_range = [start_date,
+ *pd.date_range(stt_date_str,
+ end_date_str)[1:]]
+ logger.info(date_range)
+
+ atmos_data = []
+ for dr in date_range:
+ req_url = (f"{endpoint}?serviceKey={api_key}"
+ f"&numOfRows=24&dataType=JSON&dataCd=ASOS&dateCd=HR"
+ f"&startDt={dr.strftime('%Y%m%d')}&startHh={dr.hour:>02}"
+ f"&endDt={dr.strftime('%Y%m%d')}&endHh=23&stnIds=108")
+
+ resp = requests.get(req_url)
+ logger.info(f"{resp}: {dr}")
+
+ if not resp.ok:
+ logger.error(f"status code: {resp.status_code}")
+ logger.error(f"request error: {resp.text}")
+ break
+
+ try:
+ json_file = resp.json()
+ json_data = json_file['response']['body']['items']['item']
+ except Exception as e:
+ logger.info(f"response text: {resp.text}")
+ logger.error(e)
+ break
+
+ raw_data = list(map(lambda x: x.values(), json_data))
+ atmos_data.extend(raw_data)
+
+ if not atmos_data:
+ logger.error("failed to request data")
+ CFR = CancelFlowRun()
+ CFR.run()
+ return False
+
+ atmos_df = pd.DataFrame(atmos_data,
+ columns = json_data[0].keys())
+ atmos_df = atmos_df[['tm', 'ta', 'rn', 'wd', 'ws', 'pa', 'ps', 'hm']]
+ atmos_df.columns = ['time', 'tmp', 'precip', 'wd', 'ws', 'p', 'mslp', 'rh']
+
+ atmos_df['precip'] = atmos_df['precip'].apply(lambda x: 0 if not x else x)
+ atmos_df['time'] = pd.to_datetime(atmos_df['time'])
+ atmos_df.iloc[:, 1:] = atmos_df.iloc[:, 1:].astype(float)
+
+ return True, atmos_df
+
+
+@task
+def data_validation(new_data):
+
+ logger = prefect.context.get("logger")
+ logger.info((f"data type: {type(new_data)}\n\r"
+ f"data shape: {new_data.shape}"))
+
+ start_date = get_st_date()
+ org_data = get_org_data(start_date)
+ org_stats = tfdv.generate_statistics_from_dataframe(org_data)
+ new_stats = tfdv.generate_statistics_from_dataframe(new_data)
+
+ org_schema = tfdv.infer_schema(org_stats)
+
+ for i in org_data.keys()[1:]:
+ temp=tfdv.get_feature(org_schema, i)
+ temp.drift_comparator.infinity_norm.threshold = 0.01
+
+ drift_anomaly = tfdv.validate_statistics(statistics=new_stats,
+ schema=org_schema,
+ previous_statistics=org_stats)
+
+ drift_stats = []
+ for anm in drift_anomaly.drift_skew_info:
+ if anm.drift_measurements[0].value > anm.drift_measurements[0].threshold:
+ drift_stats.append(anm)
+ logger.info(f"data drift vars: {drift_stats}")
+
+ if not drift_stats:
+ logger.info(True)
+ return True, new_data
+ else:
+ logger.info(False)
+ logger.info(drift_stats)
+ CFR = CancelFlowRun()
+ CFR.run()
+ return False
+
+
+@task
+def data_load_to_db(new_data, ps_user, ps_host, ps_pw):
+ logger = prefect.context.get("logger")
+ logger.info((f"data type: {type(new_data)}\n\r"
+ f"data shape: {new_data.shape}"))
+ try:
+ load_to_db(new_data)
+ logger.info("data has been saved successfully!")
+ return True
+ except Exception as e:
+ logger.error(e)
+ CFR = CancelFlowRun()
+ CFR.run()
+ return False
+
+
+@task
+def train_mlflow_ray(load_data_suc, host_url, exp_name, metric, num_trials):
+ mlflow.set_tracking_uri(host_url)
+ mlflow.set_experiment(exp_name)
+
+ it = AtmosTuner(
+ host_url=host_url, exp_name=exp_name, metric=metric
+ )
+ it.exec(num_trials=num_trials)
+
+ return True
+
+
+@task
+def log_best_model(is_end, host_url, exp_name, metric, model_type):
+ mlflow.set_tracking_uri(host_url)
+
+ client = MlflowClient()
+ exp_id = client.get_experiment_by_name(exp_name).experiment_id
+ runs = mlflow.search_runs([exp_id])
+
+ best_score = runs["metrics.mae"].min()
+ best_run = runs[runs["metrics.mae"] == best_score]
+ run_id = best_run.run_id.item()
+
+ save_best_model(
+ run_id,
+ model_type,
+ metric,
+ metric_score=best_score,
+ model_name=exp_name,
+ )
+
+
+# if __name__ == '__main__':
+# host_url = os.getenv('MLFLOW_HOST')
+# exp_name = "atmos_tmp"
+# metric = "mae"
+# model_type = "tensorflow"
+# num_trials = 10
+# extr_result = data_extract(os.getenv('ATMOS_API_KEY'))
+# valid_result = data_validation(extr_result[1])
+# load_data = data_load_to_db(valid_result[1],
+# os.getenv("POSTGRES_USER"),
+# os.getenv("POSTGRES_SERVER"),
+# os.getenv("POSTGRES_PASSWORD"))
+# is_end = train_mlflow_ray(load_data,
+# host_url,
+# exp_name,
+# metric,
+# num_trials)
+# log_best_model(is_end, host_url, exp_name, metric, model_type)
\ No newline at end of file
diff --git a/prefect/atmos_tmp_pipeline/utils.py b/prefect/atmos_tmp_pipeline/utils.py
new file mode 100644
index 0000000..938d4f5
--- /dev/null
+++ b/prefect/atmos_tmp_pipeline/utils.py
@@ -0,0 +1,273 @@
+from abc import ABC, abstractmethod
+from dotenv import load_dotenv
+import os
+import sqlalchemy
+import pandas as pd
+
+import numpy as np
+import pandas as pd
+from tensorflow import keras
+from tensorflow.keras.models import Sequential
+from tensorflow.keras.layers import Dense
+from tensorflow.keras.callbacks import EarlyStopping
+from tensorflow.keras.layers import GRU
+from sklearn.metrics import mean_absolute_error, mean_squared_error
+from sklearn.model_selection import train_test_split
+from ray import tune
+import mlflow
+from mlflow.tracking import MlflowClient
+from mlflow.models.signature import ModelSignature
+from mlflow.types.schema import Schema, TensorSpec
+from query import INSERT_BEST_MODEL, SELECT_EXIST_MODEL, UPDATE_BEST_MODEL
+
+
+def connect(db):
+ """Returns a connection and a metadata object"""
+
+ load_dotenv(verbose=True)
+
+ POSTGRES_USER = os.getenv("POSTGRES_USER")
+ POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD")
+ POSTGRES_SERVER = os.getenv("POSTGRES_SERVER")
+ POSTGRES_PORT = os.getenv("POSTGRES_PORT")
+ POSTGRES_DB = db
+
+ url = f"postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_SERVER}:{POSTGRES_PORT}/{POSTGRES_DB}"
+
+ connection = sqlalchemy.create_engine(url)
+
+ return connection
+
+conn = connect('postgres')
+load_dotenv(verbose=True)
+
+def load_to_db(data):
+ data.to_sql("atmos_stn108", conn, index=False, if_exists='append')
+
+
+def get_st_date():
+ start_date = conn.execute(
+ "SELECT time FROM atmos_stn108 ORDER BY time DESC;"
+ ).fetchone()[0]
+ return start_date
+
+
+def get_org_data(start_date):
+ org_data_query = f"""
+ SELECT *
+ FROM atmos_stn108
+ WHERE time < '{start_date}';
+ """
+ org_data = pd.read_sql(org_data_query, conn)
+ return org_data
+
+
+def preprocess(data):
+ # missing data
+ data = data.fillna(method="ffill")
+
+ # etc.
+
+ return data
+
+
+class Tuner(ABC):
+ def __init__(self):
+ self.model = None
+ self.data_X = None
+ self.data_y = None
+ self.config = None
+
+
+ def _split(self, test_size):
+ """
+ self.data_X, self.data_y ๋ฅผ split
+ data_X์ data_y๋ ์์๋ฐ์ class์์ ๊ฐ์ ๋ฐ๊ฒ ๋์ด์์.
+ """
+ train_X, valid_X, train_y, valid_y = train_test_split(
+ self.data_X,
+ self.data_y,
+ test_size=test_size,
+ )
+
+ return train_X, valid_X, train_y, valid_y
+
+
+ def _split_ts(self, data, label, window_size=365, predsize=None):
+ feature_list = []
+ label_list = []
+
+ if isinstance(predsize, int):
+ for i in range(len(data) - (window_size + predsize)):
+ feature_list.append(np.array(data.iloc[i : i + window_size]))
+ label_list.append(
+ np.array(label.iloc[i + window_size : i + window_size + predsize])
+ )
+ else:
+ for i in range(len(data) - window_size):
+ feature_list.append(np.array(data.iloc[i : i + window_size]))
+ label_list.append(np.array(label.iloc[i + window_size]))
+
+ return np.array(feature_list), np.array(label_list)
+
+
+ def _get_divided_index(self, data_length, ratio):
+ """
+ return index based on ratio
+ --------------------------------------------------
+ example
+
+ >>> split_data(data_length = 20, ratio = [1,2,3])
+ [3, 10]
+ --------------------------------------------------
+ """
+ ratio = np.cumsum(np.array(ratio) / np.sum(ratio))
+
+ idx = []
+ for i in ratio[:-1]:
+ idx.append(round(data_length * i))
+
+ return idx
+
+
+ @abstractmethod
+ def exec(self):
+ pass
+
+
+class AtmosTuner(Tuner):
+ def __init__(self, host_url, exp_name, metric):
+ self.host_url = host_url
+ self.exp_name = exp_name
+ self.metric = metric
+ self.TUNE_METRIC_DICT = {"mae": "min",
+ "mse": "min",
+ "rmse": "min"}
+
+
+ def _log_experiments(self, config, metrics, tf_model):
+ best_score = None
+ mlflow.set_tracking_uri(self.host_url)
+
+ client = MlflowClient()
+ exp_id = client.get_experiment_by_name(self.exp_name).experiment_id
+ runs = mlflow.search_runs([exp_id])
+
+ if len(runs) > 0:
+ try:
+ best_score = runs[f"metrics.{self.metric}"].min()
+ except Exception as e:
+ print(e)
+
+ with mlflow.start_run(experiment_id=exp_id):
+ mlflow.log_metrics(metrics)
+ mlflow.log_params(config)
+
+ if not best_score or best_score > metrics[self.metric]:
+ print("log model")
+ input_schema = Schema([TensorSpec(np.dtype(np.float), (-1, 72, 1))])
+ output_schema = Schema([TensorSpec(np.dtype(np.float32), (-1, 24))])
+ signature = ModelSignature(inputs=input_schema, outputs=output_schema)
+
+ mlflow.keras.log_model(tf_model,
+ signature = signature,
+ artifact_path = "model")
+
+
+ def _trainable(self, config):
+ data = pd.read_sql("select tmp \
+ from atmos_stn108 \
+ where time > '2020-12-31 23:00';", conn)
+ data = preprocess(data)
+ train_feature, train_label = self._split_ts(data, data, 72, 24)
+
+ idx = self._get_divided_index(train_feature.shape[0], [6, 3, 1])
+ X_train, X_valid, X_test = (
+ train_feature[: idx[0]],
+ train_feature[idx[0] : idx[1]],
+ train_feature[idx[1] :],
+ )
+ y_train, y_valid, y_test = (
+ train_label[: idx[0]],
+ train_label[idx[0] : idx[1]],
+ train_label[idx[1] :],
+ )
+
+ model = Sequential()
+ for layer in range(config["layer_n"]):
+ if layer == config["layer_n"] - 1:
+ model.add(GRU(config["cell"]))
+ else:
+ model.add(
+ GRU(
+ config["cell"],
+ return_sequences=True,
+ input_shape=[None, train_feature.shape[2]],
+ )
+ )
+ model.add(Dense(24))
+
+ model.compile(loss=self.metric, optimizer=keras.optimizers.Adam(lr=0.001))
+ early_stop = EarlyStopping(monitor="val_loss", patience=5)
+
+ model.fit(
+ X_train,
+ y_train,
+ epochs=2,
+ batch_size=128,
+ validation_data=(X_valid, y_valid),
+ callbacks=[early_stop],
+ )
+
+ y_true = y_test.reshape(y_test.shape[0], y_test.shape[1])
+ y_hat = model.predict(X_test)
+
+ mae = mean_absolute_error(y_true, y_hat)
+ mse = mean_squared_error(y_true, y_hat)
+
+ return {"mae":mae, "mse":mse}, model
+
+
+ def _run(self, config):
+ metrics, tf_model = self._trainable(config)
+
+ self._log_experiments(config, metrics, tf_model)
+ tune.report(**metrics)
+
+
+ def exec(self, tune_config=None, num_trials=1):
+ DEFAULT_CONFIG = {
+ "layer_n": tune.randint(2, 3),
+ "cell": tune.randint(24, 30)
+ }
+
+ config = tune_config if tune_config else DEFAULT_CONFIG
+ tune.run(
+ self._run,
+ config=config,
+ metric=self.metric,
+ mode=self.TUNE_METRIC_DICT[self.metric],
+ num_samples=num_trials,
+ )
+
+def save_best_model(
+ run_id, model_type, metric, metric_score, model_name
+):
+
+ exist_model = conn.execute(
+ SELECT_EXIST_MODEL.format(model_name)
+ ).fetchone()
+
+ # ์
๋ฐ์ดํธ
+ if exist_model and exist_model.metric_score >= metric_score:
+ conn.execute(
+ UPDATE_BEST_MODEL.format(
+ run_id, model_type, metric, metric_score, model_name
+ )
+ )
+ else: # ์์ฑ
+ conn.execute(
+ INSERT_BEST_MODEL.format(
+ model_name, run_id, model_type, metric, metric_score
+ )
+ )
\ No newline at end of file
diff --git a/prefect/insurance/Pipeline.py b/prefect/insurance/Pipeline.py
new file mode 100644
index 0000000..5f71cdd
--- /dev/null
+++ b/prefect/insurance/Pipeline.py
@@ -0,0 +1,91 @@
+from mlflow.entities import experiment
+from prefect.run_configs.local import LocalRun
+from prefect.schedules.schedules import CronSchedule
+from task import etl, log_best_model, train_mlflow_ray
+
+import prefect
+from prefect import Flow, Parameter
+
+
+class Pipeline:
+ _project_name = None
+ _flow_name = None
+ _logger = None
+ _flow = None
+
+ """
+ _param1 = Parameter("data_path", default="default_path")
+ _param2 = Parameter("model_name", default="GPN")
+ """
+
+ def __init__(self, project_name, flow_name, schedule=None):
+ self._logger = prefect.context.get("logger")
+ self._logger.info("Create Pipeline")
+
+ self._project_name = project_name
+ self._flow_name = flow_name
+ self._schedule = schedule
+
+ def create_flow(self):
+ self._logger.info(f"Create {self._flow_name} flow")
+ with Flow(self._flow_name) as flow:
+ """
+
+ data = load_data(self._param1)
+ prep_data = preprocess(data)
+ model = train(self._param2, prep_data)
+ save_model(model)
+
+ """
+ extract_query = Parameter(
+ "extract_query", "SELECT * FROM insurance"
+ )
+
+ host_url = Parameter("host_url", "http://mlflow-service:5000")
+ exp_name = Parameter("exp_name", "insurance")
+ metric = Parameter("metric", "mae")
+ model_type = Parameter("model_type", "xgboost")
+ num_trials = Parameter("num_trials", 10)
+
+ X, y = etl(extract_query)
+
+ is_end = train_mlflow_ray(
+ X, y, host_url, exp_name, metric, num_trials
+ )
+
+ if is_end:
+ log_best_model(is_end, host_url, exp_name, metric, model_type)
+
+ flow.run_config = LocalRun(working_dir="prefect/insurance")
+
+ self._flow = flow
+ self._register()
+
+ def _register(self):
+ self._logger.info(
+ f"Regist {self._flow_name} flow to {self._project_name} project"
+ )
+ self._logger.info(f"Set Cron {self._schedule}")
+
+ self._flow.register(
+ project_name=self._project_name,
+ idempotency_key=self.flow.serialized_hash(),
+ )
+
+ if self._schedule:
+ self._set_cron()
+
+ def _set_cron(self):
+ self.flow.schedule(CronSchedule(self._schedule))
+
+ @property
+ def flow(self):
+ return self._flow
+
+ @property
+ def project_name(self):
+ return self._project_name
+
+ @property
+ def flow_name(self):
+ return self._flow_name
diff --git a/prefect/insurance/db.py b/prefect/insurance/db.py
new file mode 100644
index 0000000..bb65534
--- /dev/null
+++ b/prefect/insurance/db.py
@@ -0,0 +1,23 @@
+import pandas as pd
+import sqlalchemy
+import os
+
+
+def connect(db):
+ """Returns a connection and a metadata object"""
+
+ POSTGRES_USER = os.getenv("POSTGRES_USER")
+ POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD")
+ POSTGRES_SERVER = os.getenv("POSTGRES_SERVER")
+ POSTGRES_PORT = os.getenv("POSTGRES_PORT")
+ POSTGRES_DB = db
+
+ url = f"postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_SERVER}:{POSTGRES_PORT}/{POSTGRES_DB}"
+
+ connection = sqlalchemy.create_engine(url)
+
+ return connection
+
+
+POSTGRES_DB = os.getenv("POSTGRES_DB")
+engine = connect(POSTGRES_DB)
\ No newline at end of file
diff --git a/prefect/insurance/main.py b/prefect/insurance/main.py
new file mode 100644
index 0000000..29ea4bb
--- /dev/null
+++ b/prefect/insurance/main.py
@@ -0,0 +1,9 @@
+from Pipeline import Pipeline
+from prefect.schedules.clocks import CronClock
+
+from prefect import Client
+
+if __name__ == "__main__":
+ Client().create_project(project_name="insurance")
+ pipeline = Pipeline("insurance", "insurance_flow")
+ pipeline.create_flow()
diff --git a/prefect/insurance/query.py b/prefect/insurance/query.py
new file mode 100644
index 0000000..8322b5e
--- /dev/null
+++ b/prefect/insurance/query.py
@@ -0,0 +1,32 @@
+INSERT_BEST_MODEL = """
+ INSERT INTO best_model_data (
+ model_name,
+ run_id,
+ model_type,
+ metric,
+ metric_score
+ ) VALUES (
+ '{}',
+ '{}',
+ '{}',
+ '{}',
+ {}
+ )
+ """
+
+SELECT_EXIST_MODEL = """
+ SELECT *
+ FROM best_model_data
+ WHERE model_name = '{}'
+"""
+
+UPDATE_BEST_MODEL = """
+ UPDATE best_model_data
+ SET
+ run_id = '{}',
+ model_type = '{}',
+ metric = '{}',
+ metric_score = {}
+ WHERE
+ model_name = '{}'
+"""
diff --git a/prefect/insurance/task.py b/prefect/insurance/task.py
new file mode 100644
index 0000000..1bed869
--- /dev/null
+++ b/prefect/insurance/task.py
@@ -0,0 +1,236 @@
+import os
+import random
+from abc import ABC, abstractmethod
+
+import mlflow
+import pandas as pd
+import xgboost as xgb
+from db import engine
+from mlflow.tracking import MlflowClient
+from query import INSERT_BEST_MODEL, SELECT_EXIST_MODEL, UPDATE_BEST_MODEL
+from ray import tune
+from sklearn.linear_model import LogisticRegression
+from sklearn.model_selection import train_test_split
+from sklearn.preprocessing import LabelEncoder, StandardScaler
+
+from prefect import task
+
+
+class ETL:
+ def __init__(self, data_extract_query):
+ self.df = None
+ self.data_extract_query = data_extract_query
+
+ def _extract(self):
+ self.df = pd.read_sql(self.data_extract_query, engine)
+
+ def _scaling(self, scale_list, scaler):
+ self.df.loc[:, scale_list] = scaler().fit_transform(
+ self.df.loc[:, scale_list]
+ )
+
+ def _encoder(self, enc_list, encoder):
+ for col in enc_list:
+ self.df.loc[:, col] = encoder().fit_transform(self.df.loc[:, col])
+
+ def _load(self):
+ return self.df.iloc[:, :-1].values, self.df.iloc[:, -1].values
+
+ def exec(self, *args):
+ self._extract()
+ if args is not None:
+ for trans_list, transformer in args:
+ if "encoder" in transformer.__name__.lower():
+ self._encoder(trans_list, transformer)
+ elif "scaler" in transformer.__name__.lower():
+ self._scaling(trans_list, transformer)
+ else:
+ break
+ return self._load()
+
+
+class Tuner(ABC):
+ def __init__(self):
+ self.model = None
+ self.data_X = None
+ self.data_y = None
+ self.config = None
+
+ def _split(self, test_size):
+ """
+ self.data_X, self.data_y ๋ฅผ split
+ data_X์ data_y๋ ์์๋ฐ์ class์์ ๊ฐ์ ๋ฐ๊ฒ ๋์ด์์.
+ """
+ train_X, valid_X, train_y, valid_y = train_test_split(
+ self.data_X,
+ self.data_y,
+ test_size=test_size,
+ )
+
+ return train_X, valid_X, train_y, valid_y
+
+ @abstractmethod
+ def exec(self):
+ pass
+
+
+class InsuranceTuner(Tuner):
+ def __init__(self, data_X, data_y, host_url, exp_name, metric):
+ self.host_url = host_url
+ self.exp_name = exp_name
+ self.metric = metric
+ self.data_X = data_X
+ self.data_y = data_y
+ self.TUNE_METRIC_DICT = {"mae": "min", "mse": "min", "rmse": "min"}
+
+ def _log_experiments(self, config, metrics, xgb_model):
+ best_score = None
+ mlflow.set_tracking_uri(self.host_url)
+
+ client = MlflowClient()
+ exp_id = client.get_experiment_by_name(self.exp_name).experiment_id
+ runs = mlflow.search_runs([exp_id])
+
+ if len(runs) > 0:
+ try:
+ best_score = runs[f"metrics.{self.metric}"].min()
+ except Exception as e:
+ print(e)
+
+ with mlflow.start_run(experiment_id=exp_id):
+ mlflow.log_metrics(metrics)
+ mlflow.log_params(config)
+
+ if not best_score or best_score > metrics[self.metric]:
+ print("log model")
+ mlflow.xgboost.log_model(
+ xgb_model,
+ artifact_path="model",
+ )
+
+ def _trainable(self, config):
+ train_x, test_x, train_y, test_y = super()._split(0.2)
+ train_set = xgb.DMatrix(train_x, label=train_y)
+ test_set = xgb.DMatrix(test_x, label=test_y)
+
+ results = {}
+ xgb_model = xgb.train(
+ config,
+ train_set,
+ evals=[(test_set, "eval")],
+ evals_result=results,
+ verbose_eval=False,
+ )
+ return results["eval"], xgb_model
+
+ def _run(self, config):
+ results, xgb_model = self._trainable(config)
+
+ metrics = {
+ "mae": min(results["mae"]),
+ "rmse": min(results["rmse"]),
+ }
+
+ self._log_experiments(config, metrics, xgb_model)
+ tune.report(**metrics)
+
+ def exec(self, tune_config=None, num_trials=1):
+ DEFAULT_CONFIG = {
+ "objective": "reg:squarederror",
+ "eval_metric": ["mae", "rmse"],
+ "max_depth": tune.randint(1, 9),
+ "min_child_weight": tune.choice([1, 2, 3]),
+ "subsample": tune.uniform(0.5, 1.0),
+ "eta": tune.loguniform(1e-4, 1e-1),
+ }
+
+ config = tune_config if tune_config else DEFAULT_CONFIG
+ tune.run(
+ self._run,
+ config=config,
+ metric=self.metric,
+ mode=self.TUNE_METRIC_DICT[self.metric],
+ num_samples=num_trials,
+ )
+
+
+def save_best_model(run_id, model_type, metric, metric_score, model_name):
+
+ exist_model = engine.execute(
+ SELECT_EXIST_MODEL.format(model_name)
+ ).fetchone()
+
+ # ์
๋ฐ์ดํธ
+ if exist_model and exist_model.metric_score >= metric_score:
+ engine.execute(
+ UPDATE_BEST_MODEL.format(
+ run_id, model_type, metric, metric_score, model_name
+ )
+ )
+ else: # ์์ฑ
+ engine.execute(
+ INSERT_BEST_MODEL.format(
+ model_name, run_id, model_type, metric, metric_score
+ )
+ )
+
+
+@task(nout=2)
+def etl(query):
+ etl = ETL(query)
+
+ label_encode = [["sex", "smoker", "region"], LabelEncoder]
+ standard_scale = [["age", "bmi", "children"], StandardScaler]
+
+ X, y = etl.exec(label_encode, standard_scale)
+
+ return X, y
+
+
+@task
+def train_mlflow_ray(X, y, host_url, exp_name, metric, num_trials):
+ mlflow.set_tracking_uri(host_url)
+ mlflow.set_experiment(exp_name)
+
+ it = InsuranceTuner(
+ data_X=X, data_y=y, host_url=host_url, exp_name=exp_name, metric=metric
+ )
+ it.exec(num_trials=num_trials)
+
+ return True
+
+
+@task
+def log_best_model(is_end, host_url, exp_name, metric, model_type):
+ mlflow.set_tracking_uri(host_url)
+
+ client = MlflowClient()
+ exp_id = client.get_experiment_by_name(exp_name).experiment_id
+ runs = mlflow.search_runs([exp_id])
+
+ best_score = runs["metrics.mae"].min()
+ best_run = runs[runs["metrics.mae"] == best_score]
+ run_id = best_run.run_id.item()
+
+ save_best_model(
+ run_id,
+ model_type,
+ metric,
+ metric_score=best_score,
+ model_name=exp_name,
+ )
+
+
+# if __name__ == "__main__":
+# extract_query = "SELECT * FROM insurance"
+# host_url = "http://localhost:5001"
+# exp_name = "insurance"
+# metric = "mae"
+# model_type = "xgboost"
+# num_trials = 1
+
+# X, y = etl(extract_query)
+# is_end = train_mlflow_ray(X, y, host_url, exp_name, metric, num_trials)
+
+# if is_end:
+# log_best_model(is_end, host_url, exp_name, metric, model_type)
diff --git a/prefect/mnist/Pipeline.py b/prefect/mnist/Pipeline.py
new file mode 100644
index 0000000..e026ad6
--- /dev/null
+++ b/prefect/mnist/Pipeline.py
@@ -0,0 +1,100 @@
+from prefect.run_configs import LocalRun
+from prefect.schedules.schedules import CronSchedule
+from task import (
+ case2,
+ log_experiment,
+ make_feature_weight,
+ train_knn,
+ tune_cnn,
+)
+
+import prefect
+from prefect import Flow, Parameter, case
+
+
+class Pipeline:
+ _project_name = None
+ _flow_name = None
+ _logger = None
+ _flow = None
+
+ """
+ _param1 = Parameter("data_path", default="default_path")
+ _param2 = Parameter("model_name", default="GPN")
+ """
+
+ def __init__(self, project_name, flow_name, schedule=None):
+ self._logger = prefect.context.get("logger")
+ self._logger.info("Create Pipeline")
+
+ self._project_name = project_name
+ self._flow_name = flow_name
+ self._schedule = schedule
+
+ def create_flow(self):
+ self._logger.info(f"Create {self._flow_name} flow")
+ with Flow(self._flow_name) as flow:
+ """
+
+ data = load_data(self._param1)
+ prep_data = preprocess(data)
+ model = train(self._param2, prep_data)
+ save_model(model)
+
+ """
+
+ host_url = Parameter("host_url", "http://mlflow-service:5000")
+ exp_name = Parameter("exp_name", "mnist")
+ metric = Parameter("metric", "loss")
+ num_samples = Parameter("num_samples", 1)
+ max_num_epochs = Parameter("max_num_epochs", 1)
+ is_cloud = Parameter("is_cloud", True)
+ data_version = Parameter("data_version", 3)
+
+ results = tune_cnn(
+ num_samples, max_num_epochs, is_cloud, data_version, exp_name
+ )
+ is_end = log_experiment(
+ results, host_url, exp_name, metric, data_version, is_cloud
+ )
+
+ with case(is_end, True):
+ feature_weight_df = make_feature_weight(
+ results, "cpu", is_cloud, data_version, exp_name
+ )
+ train_knn(feature_weight_df, metric, exp_name)
+
+ with case(is_end, False):
+ case2()
+ flow.run_config = LocalRun(working_dir="prefect/mnist")
+ self._flow = flow
+ self._register()
+
+ def _register(self):
+ self._logger.info(
+ f"Regist {self._flow_name} flow to {self._project_name} project"
+ )
+ self._logger.info(f"Set Cron {self._schedule}")
+
+ self._flow.register(
+ project_name=self._project_name,
+ idempotency_key=self.flow.serialized_hash(),
+ )
+
+ if self._schedule:
+ self._set_cron()
+
+ def _set_cron(self):
+ self.flow.schedule((self._schedule))
+
+ @property
+ def flow(self):
+ return self._flow
+
+ @property
+ def project_name(self):
+ return self._project_name
+
+ @property
+ def flow_name(self):
+ return self._flow_name
diff --git a/prefect/mnist/main.py b/prefect/mnist/main.py
new file mode 100644
index 0000000..86134f9
--- /dev/null
+++ b/prefect/mnist/main.py
@@ -0,0 +1,9 @@
+from Pipeline import Pipeline
+from prefect.schedules.clocks import CronClock
+
+from prefect import Client
+
+if __name__ == "__main__":
+ Client().create_project(project_name="mnist")
+ pipeline = Pipeline("mnist", "mnist_flow")
+ pipeline.create_flow()
diff --git a/prefect/mnist/model.py b/prefect/mnist/model.py
new file mode 100644
index 0000000..ac4c09e
--- /dev/null
+++ b/prefect/mnist/model.py
@@ -0,0 +1,29 @@
+import torch
+
+
+class MnistNet(torch.nn.Module):
+ def __init__(self, l1):
+ super(MnistNet, self).__init__()
+ self.layer1 = torch.nn.Sequential(
+ torch.nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1),
+ torch.nn.ReLU(),
+ torch.nn.MaxPool2d(kernel_size=2, stride=2),
+ )
+ self.layer2 = torch.nn.Sequential(
+ torch.nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
+ torch.nn.ReLU(),
+ torch.nn.MaxPool2d(kernel_size=2, stride=2),
+ )
+ self.flatten = torch.nn.Flatten()
+ self.fc = torch.nn.Linear(7 * 7 * 64, l1, bias=True)
+ self.fc2 = torch.nn.Linear(l1, 32, bias=True)
+ self.last_layer = torch.nn.Linear(32, 10, bias=True)
+
+ def forward(self, x):
+ out = self.layer1(x)
+ out = self.layer2(out)
+ out = self.flatten(out)
+ out = self.fc(out)
+ out = self.fc2(out)
+ out = self.last_layer(out)
+ return out
diff --git a/prefect/mnist/query.py b/prefect/mnist/query.py
new file mode 100644
index 0000000..8322b5e
--- /dev/null
+++ b/prefect/mnist/query.py
@@ -0,0 +1,32 @@
+INSERT_BEST_MODEL = """
+ INSERT INTO best_model_data (
+ model_name,
+ run_id,
+ model_type,
+ metric,
+ metric_score
+ ) VALUES (
+ '{}',
+ '{}',
+ '{}',
+ '{}',
+ {}
+ )
+ """
+
+SELECT_EXIST_MODEL = """
+ SELECT *
+ FROM best_model_data
+ WHERE model_name = '{}'
+"""
+
+UPDATE_BEST_MODEL = """
+ UPDATE best_model_data
+ SET
+ run_id = '{}',
+ model_type = '{}',
+ metric = '{}',
+ metric_score = {}
+ WHERE
+ model_name = '{}'
+"""
diff --git a/prefect/mnist/task.py b/prefect/mnist/task.py
new file mode 100644
index 0000000..46d5708
--- /dev/null
+++ b/prefect/mnist/task.py
@@ -0,0 +1,236 @@
+import os
+import time
+from functools import partial
+
+import mlflow
+import numpy as np
+import pandas as pd
+import torch
+import torchvision.transforms as transforms
+from dotenv import load_dotenv
+from mlflow.tracking import MlflowClient
+from scipy.special import softmax
+from model import MnistNet
+from ray import tune
+from ray.tune.schedulers import ASHAScheduler
+from sklearn.neighbors import KNeighborsClassifier
+from mlflow.types.schema import Schema, TensorSpec
+from mlflow.models.signature import ModelSignature
+from torch.utils.data import DataLoader
+from utils import (
+ MnistDataset,
+ MnistNet,
+ cnn_training,
+ load_data,
+ save_best_model,
+ get_mnist_avg
+)
+
+from prefect import task
+
+load_dotenv()
+
+
+@task
+def tune_cnn(num_samples, max_num_epochs, is_cloud, data_version, exp_name):
+
+ config = {
+ "l1": tune.sample_from(lambda _: 2 ** np.random.randint(7, 9)),
+ "lr": tune.loguniform(1e-4, 1e-1),
+ "batch_size": tune.choice([64, 128, 256]),
+ }
+
+ scheduler = ASHAScheduler(
+ metric="loss",
+ mode="min",
+ max_t=max_num_epochs,
+ grace_period=1,
+ reduction_factor=2,
+ )
+
+ result = tune.run(
+ partial(
+ cnn_training,
+ is_cloud=is_cloud,
+ data_version=data_version,
+ exp_name=exp_name,
+ ),
+ config=config,
+ num_samples=num_samples,
+ scheduler=scheduler,
+ )
+
+ return result
+
+
+@task
+def log_experiment(results, host_url, exp_name, metric, data_version, is_cloud):
+ mlflow.set_tracking_uri(host_url)
+ mlflow.set_experiment(exp_name)
+ client = MlflowClient()
+ exp = client.get_experiment_by_name(exp_name)
+
+ best_trial = results.get_best_trial("loss", "min", "last")
+ train_df, valid_df = load_data(is_cloud, data_version, exp_name)
+ train_avg = get_mnist_avg(train_df)
+ valid_avg = get_mnist_avg(valid_df)
+
+ train_avg = {f'color_avg_{k}':v for k, v in enumerate(train_avg)}
+ valid_avg = {f'color_avg_{k}':v for k, v in enumerate(valid_avg)}
+
+ metrics = {
+ "loss": best_trial.last_result["loss"],
+ "accuracy": best_trial.last_result["accuracy"],
+ }
+ configs = {
+ "l1": best_trial.config["l1"],
+ "lr": best_trial.config["lr"],
+ "batch_size": best_trial.config["batch_size"],
+ "data_version": data_version,
+ }
+ result_pred = best_trial.last_result["result_pred"]
+ metrics.update(result_pred)
+ configs.update(train_avg)
+ configs.update(valid_avg)
+ best_trained_model = MnistNet(configs["l1"])
+ best_checkpoint_dir = best_trial.checkpoint.value
+ model_state, optimizer_state = torch.load(
+ os.path.join(best_checkpoint_dir, "checkpoint")
+ )
+ best_trained_model.load_state_dict(model_state)
+ best_trained_model = torch.jit.script(best_trained_model)
+ exp_id = exp.experiment_id
+ runs = mlflow.search_runs([exp_id])
+ input_schema = Schema([
+ TensorSpec(np.dtype(np.uint8), (-1, 28, 28, 1)),
+ ])
+ output_schema = Schema([TensorSpec(np.dtype(np.float32), (-1, 10))])
+ signature = ModelSignature(inputs=input_schema, outputs=output_schema)
+ if runs.empty:
+ with mlflow.start_run(experiment_id=exp_id):
+ mlflow.log_metrics(metrics)
+ mlflow.log_params(configs)
+ mlflow.pytorch.log_model(best_trained_model, signature = signature, artifact_path="model")
+
+ save_best_model(
+ exp_name, "pytorch", metric, metrics[metric], exp_name
+ )
+ return True
+ else:
+ best_score = runs[f"metrics.{metric}"].min()
+
+ if best_score > metrics[metric]:
+ with mlflow.start_run(experiment_id=exp_id):
+ mlflow.log_metrics(metrics)
+ mlflow.log_params(configs)
+ mlflow.pytorch.log_model(
+ best_trained_model, signature = signature, artifact_path="model"
+ )
+ save_best_model(
+ exp_name, "pytorch", metric, metrics[metric], exp_name
+ )
+ return True
+ else:
+ return False
+
+
+@task
+def make_feature_weight(results, device, is_cloud, data_version, exp_name):
+ best_trial = results.get_best_trial("loss", "min", "last")
+
+ train_df, _ = load_data(is_cloud, data_version, exp_name)
+
+ configs = {
+ "l1": best_trial.config["l1"],
+ "lr": best_trial.config["lr"],
+ "batch_size": best_trial.config["batch_size"],
+ }
+ best_trained_model = MnistNet(configs["l1"])
+ best_checkpoint_dir = best_trial.checkpoint.value
+ model_state, _ = torch.load(
+ os.path.join(best_checkpoint_dir, "checkpoint")
+ )
+ best_trained_model.load_state_dict(model_state)
+ best_trained_model2 = torch.nn.Sequential(
+ *list(best_trained_model.children())[:-1]
+ )
+ transform = transforms.Compose(
+ [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
+ )
+ trainset = MnistDataset(train_df, transform)
+ train_loader = DataLoader(trainset, batch_size=int(configs["batch_size"]))
+
+ temp = pd.DataFrame(
+ columns=[f"{i}_feature" for i in range(74)], index=train_df.index
+ )
+ batch_index = 0
+ batch_size = train_loader.batch_size
+ optimizer = torch.optim.Adam(
+ best_trained_model2.parameters(), lr=configs["lr"]
+ )
+
+ for i, (mini_batch, _) in enumerate(train_loader):
+ add_weight = 10
+ mini_batch = mini_batch.to(device)
+ optimizer.zero_grad()
+ outputs = best_trained_model2(mini_batch)
+ preds = best_trained_model(mini_batch)
+ batch_index = i * batch_size
+ temp.iloc[
+ batch_index : batch_index + batch_size, :
+ ] = np.concatenate((outputs.detach().numpy(), softmax(preds.detach().numpy().astype(float)) * add_weight), axis=1)
+
+
+ temp.reset_index(inplace=True)
+ feature_weight_df = temp
+
+ return feature_weight_df
+
+
+@task
+def train_knn(feature_weight_df, metric, exp_name):
+ KNN = KNeighborsClassifier(n_neighbors=3)
+ KNN.fit(
+ feature_weight_df.iloc[:, 1:].values,
+ feature_weight_df.iloc[:, 0].values,
+ )
+
+ mlflow.sklearn.log_model(KNN, artifact_path="model")
+ mlflow.log_param("time", time.time())
+ save_best_model("mnist_knn", "sklearn", metric, 9999, exp_name, True)
+
+
+@task
+def case2():
+ print("end")
+
+
+# if __name__ == "__main__":
+# # data_path = "C:\Users\TFG5076XG\Documents\MLOps\prefect\mnist\mnist.csv"
+# host_url = "http://localhost:5000"
+# exp_name = "mnist"
+# device = "cpu"
+# num_samples = 1
+# max_num_epochs = 1
+# metric = 'loss'
+# is_cloud=True
+# data_version = 3
+
+# mlflow.set_tracking_uri(host_url)
+# mlflow.set_experiment(exp_name)
+
+# results = tune_cnn(
+# num_samples, max_num_epochs, is_cloud, data_version, exp_name
+# )
+# is_end = log_experiment(
+# results, host_url, exp_name, metric, data_version, is_cloud
+# )
+
+# if is_end:
+# feature_weight_df = make_feature_weight(
+# results, "cpu", is_cloud, data_version, exp_name
+# )
+# train_knn(feature_weight_df, metric, exp_name)
+
+# else:
+# print('False')
diff --git a/prefect/mnist/utils.py b/prefect/mnist/utils.py
new file mode 100644
index 0000000..e737bc3
--- /dev/null
+++ b/prefect/mnist/utils.py
@@ -0,0 +1,286 @@
+import os
+from io import StringIO
+
+import mlflow
+import numpy as np
+import pandas as pd
+import sqlalchemy
+import torch
+import torch.nn as nn
+import torch.optim as optim
+import torchvision.transforms as transforms
+from dotenv import load_dotenv
+from google.cloud import storage
+from mlflow.tracking import MlflowClient
+from query import INSERT_BEST_MODEL, SELECT_EXIST_MODEL, UPDATE_BEST_MODEL
+from ray import tune
+from torch.utils.data import DataLoader, Dataset
+
+load_dotenv()
+
+
+def connect(db):
+ """Returns a connection and a metadata object"""
+
+ POSTGRES_USER = os.getenv("POSTGRES_USER")
+ POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD")
+ POSTGRES_SERVER = os.getenv("POSTGRES_SERVER")
+ POSTGRES_PORT = os.getenv("POSTGRES_PORT")
+ POSTGRES_DB = db
+
+ url = f"postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_SERVER}:{POSTGRES_PORT}/{POSTGRES_DB}"
+
+ connection = sqlalchemy.create_engine(url)
+
+ return connection
+
+
+POSTGRES_DB = os.getenv("POSTGRES_DB")
+engine = connect(POSTGRES_DB)
+
+
+class MnistNet(torch.nn.Module):
+ def __init__(self, l1):
+ super(MnistNet, self).__init__()
+ self.layer1 = torch.nn.Sequential(
+ torch.nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1),
+ torch.nn.ReLU(),
+ torch.nn.MaxPool2d(kernel_size=2, stride=2),
+ )
+ self.layer2 = torch.nn.Sequential(
+ torch.nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
+ torch.nn.ReLU(),
+ torch.nn.MaxPool2d(kernel_size=2, stride=2),
+ )
+ self.flatten = torch.nn.Flatten()
+ self.fc = torch.nn.Linear(7 * 7 * 64, l1, bias=True)
+ self.fc2 = torch.nn.Linear(l1, 64, bias=True)
+ self.last_layer = torch.nn.Linear(64, 10, bias=True)
+
+ def forward(self, x):
+ out = self.layer1(x)
+ out = self.layer2(out)
+ out = self.flatten(out)
+ out = self.fc(out)
+ out = self.fc2(out)
+ out = self.last_layer(out)
+ return out
+
+
+class MnistDataset(Dataset):
+ def __init__(self, data, transform=None):
+ self.data = data
+ self.transform = transform
+
+ def __len__(self):
+ return len(self.data)
+
+ def __getitem__(self, index):
+ item = self.data.iloc[index]
+
+ image = item[1:].values.astype(np.uint8).reshape((28, 28))
+ label = item[0]
+
+ if self.transform is not None:
+ image = self.transform(image)
+
+ return image, label
+
+
+def save_best_model(
+ model_name, model_type, metric, metric_score, exp_name, is_knn=False
+):
+ client = MlflowClient()
+ exp = client.get_experiment_by_name(exp_name)
+ exp_id = exp.experiment_id
+ runs = mlflow.search_runs([exp_id])
+ best_score = runs[f"metrics.{metric}"].min()
+ best_run = runs[runs[f"metrics.{metric}"] == best_score]
+ run_id = best_run.run_id.item()
+ if is_knn:
+ recent_knn = (
+ runs[~runs["params.time"].isna()]["params.time"]
+ .astype(float)
+ .max()
+ )
+ run_id = runs[runs["params.time"] == str(recent_knn)]["run_id"].item()
+
+ exist_model = engine.execute(
+ SELECT_EXIST_MODEL.format(model_name)
+ ).fetchone()
+
+ if exist_model and exist_model.metric_score >= metric_score:
+ engine.execute(
+ UPDATE_BEST_MODEL.format(
+ run_id, model_type, metric, metric_score, model_name
+ )
+ )
+ else:
+ engine.execute(
+ INSERT_BEST_MODEL.format(
+ model_name, run_id, model_type, metric, metric_score
+ )
+ )
+
+
+def load_data_cloud(bucket_name, data_path):
+ storage_client = storage.Client()
+ bucket = storage_client.bucket(bucket_name)
+ blob = bucket.blob(data_path)
+
+ bytes_data = blob.download_as_bytes()
+
+ s = str(bytes_data, "utf-8")
+
+ data = StringIO(s)
+ df = pd.read_csv(data)
+
+ return df
+
+
+def get_data_path_from_db(data_version, exp_name):
+ select_query = """
+ SELECT *
+ FROM data_info
+ where version = {} and exp_name = '{}'
+ """
+ (train_path, _, _, _), (valid_path, _, _, _) = engine.execute(
+ select_query.format(data_version, exp_name)
+ ).fetchall()
+
+ return train_path, valid_path
+
+
+def load_data(is_cloud, data_version, exp_name):
+
+ if is_cloud:
+ CLOUD_STORAGE_NAME = os.getenv("CLOUD_STORAGE_NAME")
+ train_path, valid_path = get_data_path_from_db(data_version, exp_name)
+ train_df = load_data_cloud(CLOUD_STORAGE_NAME, train_path)
+ valid_df = load_data_cloud(CLOUD_STORAGE_NAME, valid_path)
+ else:
+ TRAIN_MNIST = os.getenv("TRAIN_MNIST")
+ VALID_MNIST = os.getenv("VALID_MNIST")
+ train_df = pd.read_csv(TRAIN_MNIST)
+ valid_df = pd.read_csv(VALID_MNIST)
+
+ return train_df, valid_df
+
+
+def cnn_training(
+ config, data_version, exp_name, checkpoint_dir=None, is_cloud=True
+):
+ Net = MnistNet(config["l1"])
+ device = "cpu"
+
+ if torch.cuda.is_available():
+ device = "cuda:0"
+ if torch.cuda.device_count() > 1:
+ Net = nn.DataParallel(Net)
+ Net.to(device)
+
+ criterion = nn.CrossEntropyLoss()
+ optimizer = optim.SGD(Net.parameters(), lr=config["lr"], momentum=0.9)
+
+ if checkpoint_dir:
+ model_state, optimizer_state = torch.load(
+ os.path.join(checkpoint_dir, "checkpoint")
+ )
+ Net.load_state_dict(model_state)
+ optimizer.load_state_dict(optimizer_state)
+
+ transform = transforms.Compose(
+ [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
+ )
+ train_df, valid_df = load_data(is_cloud, data_version, exp_name)
+
+ trainset = MnistDataset(train_df, transform)
+ validset = MnistDataset(valid_df, transform)
+ train_loader = DataLoader(trainset, batch_size=int(config["batch_size"]))
+ valid_loader = DataLoader(validset, batch_size=int(config["batch_size"]))
+
+ for epoch in range(10): # loop over the dataset multiple times
+ running_loss = 0.0
+ epoch_steps = 0
+ for i, data in enumerate(train_loader, 0):
+ inputs, labels = data
+ inputs, labels = inputs.to(device), labels.to(device)
+
+ optimizer.zero_grad()
+
+ outputs = Net(inputs)
+ loss = criterion(outputs, labels)
+ loss.backward()
+ optimizer.step()
+
+ running_loss += loss.item()
+ epoch_steps += 1
+ if i % 2000 == 1999:
+ print(
+ "[%d, %5d] loss: %.3f"
+ % (epoch + 1, i + 1, running_loss / epoch_steps)
+ )
+ running_loss = 0.0
+
+ val_loss = 0.0
+ val_steps = 0
+ total = 0
+ correct = 0
+
+ classes = ("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")
+ correct_pred = {classname: 0 for classname in classes}
+ total_pred = {classname: 0 for classname in classes}
+ result_pred = {
+ f"{classname}_acc_percentage": 0 for classname in classes
+ }
+
+ for i, data in enumerate(valid_loader, 0):
+ with torch.no_grad():
+ inputs, labels = data
+ inputs, labels = inputs.to(device), labels.to(device)
+
+ outputs = Net(inputs)
+ _, predicted = torch.max(outputs.data, 1)
+ total += labels.size(0)
+ correct += (predicted == labels).sum().item()
+
+ loss = criterion(outputs, labels)
+ val_loss += loss.cpu().numpy()
+ val_steps += 1
+
+ for label, prediction in zip(labels, predicted):
+ if label == prediction:
+ correct_pred[classes[label]] += 1
+ total_pred[classes[label]] += 1
+
+ with tune.checkpoint_dir(epoch) as checkpoint_dir:
+ path = os.path.join(checkpoint_dir, "checkpoint")
+ torch.save((Net.state_dict(), optimizer.state_dict()), path)
+
+ for classname, correct_count in correct_pred.items():
+ accuracy = round(
+ 100 * float(correct_count) / total_pred[classname], 2
+ )
+ result_pred[f"{classname}_acc_percentage"] = accuracy
+
+ tune.report(
+ loss=(val_loss / val_steps),
+ accuracy=correct / total,
+ result_pred=result_pred,
+ )
+
+
+def preprocess_train(train_df, valid_df, batch_size):
+ transform = transforms.Compose(
+ [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
+ )
+ trainset = MnistDataset(train_df, transform)
+ validset = MnistDataset(valid_df, transform)
+ train_loader = DataLoader(trainset, batch_size=batch_size)
+ valid_loader = DataLoader(validset, batch_size=batch_size)
+ total_batch = len(train_loader)
+
+ return (train_loader, valid_loader, total_batch)
+
+def get_mnist_avg(df):
+ return np.round(df.groupby('label').mean().mean(axis=1).values,2).tolist()
\ No newline at end of file
diff --git a/requirements.sh b/requirements.sh
new file mode 100644
index 0000000..27c36e4
--- /dev/null
+++ b/requirements.sh
@@ -0,0 +1 @@
+pip install fastapi==0.70.0 uvicorn==0.15.0 python-dotenv==0.19.1 sqlalchemy==1.4.25 psycopg2-binary==2.9.1 colorlog==6.5.0 pandas==1.3.3 xgboost==1.4.2 pyyaml==5.4.1 numpy==1.19.5 scikit-learn==1.0 prefect==0.15.6 mlflow==1.20.2 ray==1.7.0 tensorflow-data-validation==1.2.0 torch==1.10.0 torchvision==0.11.1 google-cloud-storage==1.42.3 hiredis==2.0.0 && pip uninstall -y tensorflow==2.6 && pip install tensorflow-cpu==2.4
\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
index ea0763c..729f816 100644
Binary files a/requirements.txt and b/requirements.txt differ
diff --git a/set_prefect.sh b/set_prefect.sh
new file mode 100755
index 0000000..ad365fb
--- /dev/null
+++ b/set_prefect.sh
@@ -0,0 +1,5 @@
+python /prefect/mnist/main.py
+python /prefect/atmos_tmp_pipeline/main.py
+python /prefect/insurance/main.py
+
+prefect agent local start
\ No newline at end of file