forked from udacity/Project-Build-an-ML-Pipeline-Starter
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
129 lines (109 loc) · 4.64 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# Databricks notebook source
import json
import mlflow
import tempfile
import os
import wandb
import hydra
from omegaconf import DictConfig
_steps = [
"download",
"basic_cleaning",
"data_check",
"data_split",
"train_random_forest",
# NOTE: We do not include this in the steps so it is not run by mistake.
# You first need to promote a model export to "prod" before you can run this,
# then you need to run this step explicitly
# "test_regression_model"
]
# This automatically reads in the configuration
@hydra.main(config_name='config')
def go(config: DictConfig):
# Setup the wandb experiment. All runs will be grouped under this name
os.environ["WANDB_PROJECT"] = config["main"]["project_name"]
os.environ["WANDB_RUN_GROUP"] = config["main"]["experiment_name"]
# Steps to execute
steps_par = config['main']['steps']
active_steps = steps_par.split(",") if steps_par != "all" else _steps
# Move to a temporary directory
with tempfile.TemporaryDirectory() as tmp_dir:
if "download" in active_steps:
# Download file and load in W&B
_ = mlflow.run(
f"{config['main']['components_repository']}/get_data",
"main",
parameters={
"sample": config["etl"]["sample"],
"artifact_name": "sample.csv",
"artifact_type": "raw_data",
"artifact_description": "Raw file as downloaded"
},
)
if "basic_cleaning" in active_steps:
_ = mlflow.run(
os.path.join(hydra.utils.get_original_cwd(), "src", "basic_cleaning"),
"main",
parameters={
"input_artifact": "sample.csv:latest",
"output_artifact": "clean_sample.csv",
"output_type": "clean_sample",
"output_description": "Data with outliers and null values removed",
"min_price": config['etl']['min_price'],
"max_price": config['etl']['max_price']
},
)
if "data_check" in active_steps:
_ = mlflow.run(
os.path.join(hydra.utils.get_original_cwd(), "src", "data_check"),
"main",
parameters={
"csv": "clean_sample.csv:latest",
"ref": "clean_sample.csv:reference",
"kl_threshold": config["data_check"]["kl_threshold"],
"min_price": config['etl']['min_price'],
"max_price": config['etl']['max_price']
}
)
if "data_split" in active_steps:
_ = mlflow.run(
f"{config['main']['components_repository']}/train_val_test_split",
"main",
parameters={
"input": "clean_sample.csv:latest",
"test_size": config["modeling"]["test_size"],
"random_seed": config["modeling"]["random_seed"],
"stratify_by": config["modeling"]["stratify_by"]
}
)
if "train_random_forest" in active_steps:
# NOTE: we need to serialize the random forest configuration into JSON
rf_config = os.path.abspath("rf_config.json")
with open(rf_config, "w+") as fp:
json.dump(dict(config["modeling"]["random_forest"].items()), fp) # DO NOT TOUCH
# NOTE: use the rf_config we just created as the rf_config parameter for the train_random_forest
# step
_ = mlflow.run(
os.path.join(hydra.utils.get_original_cwd(), "src", "train_random_forest"),
"main",
parameters={
"trainval_artifact": "trainval_data.csv:latest",
"val_size": config["modeling"]["val_size"],
"random_seed": config["modeling"]["random_seed"],
"stratify_by": config["modeling"]["stratify_by"],
"rf_config": rf_config,
"max_tfidf_features": config["modeling"]["max_tfidf_features"],
"output_artifact": "random_forest_export"
}
)
if "test_regression_model" in active_steps:
_ = mlflow.run(
f"{config['main']['components_repository']}/test_regression_model",
"main",
parameters={
"mlflow_model": "random_forest_export:prod",
"test_dataset": "test_data.csv:latest"
}
)
if __name__ == "__main__":
go()