Skip to content

Commit

Permalink
Modified to use dataset and pipelinedata (#193)
Browse files Browse the repository at this point in the history
* Modified to use dataset and pipelinedata

* Create diabetes dataset if no dataset specified

* Reverted null build_id check

* Added MSE tag

* Reverted logic for NULL Build ID

* Force new model if previous onehas no metric tag

* Removed unused DATA variables

* Removed unused DATA vars

* Updated workaround for non-metric-tagged models

* tidied up code
  • Loading branch information
GraemeMalcolm authored Feb 14, 2020
1 parent e108e07 commit 88636df
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 90 deletions.
2 changes: 0 additions & 2 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ EVALUATE_SCRIPT_PATH = 'evaluate/evaluate_model.py'
REGISTER_SCRIPT_PATH = 'register/register_model.py'
SOURCES_DIR_TRAIN = 'diabetes_regression'
DATASET_NAME = 'diabetes_ds'
DATASTORE_NAME = 'datablobstore'
DATAFILE_NAME = 'diabetes.csv'

# Optional. Used by a training pipeline with R on Databricks
DB_CLUSTER_ID = ''
Expand Down
19 changes: 6 additions & 13 deletions diabetes_regression/evaluate/evaluate_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
help="Name of the Model",
default="sklearn_regression_model.pkl",
)

parser.add_argument(
"--allow_run_cancel",
type=str,
Expand Down Expand Up @@ -122,18 +123,10 @@
model_name, tag_name, exp.name, ws)

if (model is not None):

production_model_run_id = model.run_id

# Get the run history for both production model and
# newly trained model and compare mse
production_model_run = Run(exp, run_id=production_model_run_id)
new_model_run = run.parent
print("Production model run is", production_model_run)

production_model_mse = \
production_model_run.get_metrics().get(metric_eval)
new_model_mse = new_model_run.get_metrics().get(metric_eval)
production_model_mse = 10000
if (metric_eval in model.tags):
production_model_mse = float(model.tags[metric_eval])
new_model_mse = float(run.parent.get_metrics().get(metric_eval))
if (production_model_mse is None or new_model_mse is None):
print("Unable to find", metric_eval, "metrics, "
"exiting evaluation")
Expand All @@ -151,7 +144,7 @@
print("New trained model performs better, "
"thus it should be registered")
else:
print("New trained model metric is less than or equal to "
print("New trained model metric is worse than or equal to "
"production model so skipping model registration.")
if((allow_run_cancel).lower() == 'true'):
run.parent.cancel()
Expand Down
83 changes: 55 additions & 28 deletions diabetes_regression/register/register_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import sys
import argparse
import traceback
import joblib
from azureml.core import Run, Experiment, Workspace
from azureml.core.model import Model as AMLModel

Expand Down Expand Up @@ -63,17 +64,24 @@ def main():
type=str,
help="The Build ID of the build triggering this pipeline run",
)

parser.add_argument(
"--run_id",
type=str,
help="Training run ID",
)

parser.add_argument(
"--model_name",
type=str,
help="Name of the Model",
default="sklearn_regression_model.pkl",
)
parser.add_argument(
"--step_input",
type=str,
help=("input from previous steps")
)

args = parser.parse_args()
if (args.build_id is not None):
Expand All @@ -83,18 +91,42 @@ def main():
if (run_id == 'amlcompute'):
run_id = run.parent.id
model_name = args.model_name
model_path = args.step_input

if (build_id is None):
register_aml_model(model_name, exp, run_id)
else:
run.tag("BuildId", value=build_id)
builduri_base = os.environ.get("BUILDURI_BASE")
if (builduri_base is not None):
build_uri = builduri_base + build_id
run.tag("BuildUri", value=build_uri)
register_aml_model(model_name, exp, run_id, build_id, build_uri)
# load the model
print("Loading model from " + model_path)
model_file = os.path.join(model_path, model_name)
model = joblib.load(model_file)
model_mse = run.parent.get_metrics()["mse"]

if (model is not None):
if (build_id is None):
register_aml_model(model_file, model_name, exp, run_id)
else:
register_aml_model(model_name, exp, run_id, build_id)
run.tag("BuildId", value=build_id)
builduri_base = os.environ.get("BUILDURI_BASE")
if (builduri_base is not None):
build_uri = builduri_base + build_id
run.tag("BuildUri", value=build_uri)
register_aml_model(
model_file,
model_name,
model_mse,
exp,
run_id,
build_id,
build_uri)
else:
register_aml_model(
model_file,
model_name,
model_mse,
exp,
run_id,
build_id)
else:
print("Model not found. Skipping model registration.")
sys.exit(0)


def model_already_registered(model_name, exp, run_id):
Expand All @@ -109,35 +141,30 @@ def model_already_registered(model_name, exp, run_id):


def register_aml_model(
model_path,
model_name,
model_mse,
exp,
run_id,
build_id: str = 'none',
build_uri=None
):
try:
tagsValue = {"area": "diabetes_regression",
"run_id": run_id,
"experiment_name": exp.name,
"mse": model_mse}
if (build_id != 'none'):
model_already_registered(model_name, exp, run_id)
run = Run(experiment=exp, run_id=run_id)
tagsValue = {"area": "diabetes_regression",
"BuildId": build_id, "run_id": run_id,
"experiment_name": exp.name}
tagsValue["BuildId"] = build_id
if (build_uri is not None):
tagsValue["BuildUri"] = build_uri
else:
run = Run(experiment=exp, run_id=run_id)
if (run is not None):
tagsValue = {"area": "diabetes_regression",
"run_id": run_id, "experiment_name": exp.name}
else:
print("A model run for experiment", exp.name,
"matching properties run_id =", run_id,
"was not found. Skipping model registration.")
sys.exit(0)

model = run.register_model(model_name=model_name,
model_path="./outputs/" + model_name,
tags=tagsValue)

model = AMLModel.register(
workspace=exp.workspace,
model_name=model_name,
model_path=model_path,
tags=tagsValue)
os.chdir("..")
print(
"Model registered: {} \nModel Description: {} "
Expand Down
42 changes: 20 additions & 22 deletions diabetes_regression/training/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@
POSSIBILITY OF SUCH DAMAGE.
"""
from azureml.core.run import Run
from azureml.core import Dataset
import os
import argparse
from sklearn.datasets import load_diabetes
from sklearn.linear_model import Ridge
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
Expand Down Expand Up @@ -65,19 +63,20 @@ def main():
)

parser.add_argument(
"--dataset_name",
"--step_output",
type=str,
help=("Dataset with the training data")
help=("output for passing data to next step")
)

args = parser.parse_args()

print("Argument [build_id]: %s" % args.build_id)
print("Argument [model_name]: %s" % args.model_name)
print("Argument [dataset_name]: %s" % args.dataset_name)
print("Argument [step_output]: %s" % args.step_output)

model_name = args.model_name
build_id = args.build_id
dataset_name = args.dataset_name
step_output_path = args.step_output

print("Getting training parameters")

Expand All @@ -91,15 +90,17 @@ def main():
print("Parameter alpha: %s" % alpha)

run = Run.get_context()
ws = run.experiment.workspace

if (dataset_name):
dataset = Dataset.get_by_name(workspace=ws, name=dataset_name)
# Get the dataset
dataset = run.input_datasets['training_data']
if (dataset):
df = dataset.to_pandas_dataframe()
X = df.values
y = df.Y
else:
X, y = load_diabetes(return_X_y=True)
e = ("No dataset provided")
print(e)
raise Exception(e)

X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=0)
Expand All @@ -108,21 +109,18 @@ def main():

reg = train_model(run, data, alpha)

joblib.dump(value=reg, filename=model_name)

# upload model file explicitly into artifacts for parent run
run.parent.upload_file(name="./outputs/" + model_name,
path_or_stream=model_name)
print("Uploaded the model {} to experiment {}".format(
model_name, run.experiment.name))
dirpath = os.getcwd()
print(dirpath)
print("Following files are uploaded ")
print(run.parent.get_file_names())
# Pass model file to next step
os.makedirs(step_output_path, exist_ok=True)
model_output_path = os.path.join(step_output_path, model_name)
joblib.dump(value=reg, filename=model_output_path)

run.parent.tag("BuildId", value=build_id)
# Also upload model file to run outputs for history
os.makedirs('outputs', exist_ok=True)
output_path = os.path.join('outputs', model_name)
joblib.dump(value=reg, filename=output_path)

# Add properties to identify this specific training run
run.parent.tag("BuildId", value=build_id)
run.tag("BuildId", value=build_id)
run.tag("run_type", value="train")
builduri_base = os.environ.get("BUILDURI_BASE")
Expand Down
7 changes: 5 additions & 2 deletions docs/getting_started.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ the BASE_NAME value should not exceed 10 characters and it should contain number

The **RESOURCE_GROUP** parameter is used as the name for the resource group that will hold the Azure resources for the solution. If providing an existing AML Workspace, set this value to the corresponding resource group name.

The **WORKSPACE_SVC_CONNECTION** parameter is used to reference a service connection for the Azure ML workspace. You will create this after provisioning the workspace (we recommend using the IaC pipeline as described below), and installing the Azure ML extension in your Azure DevOps project.

Optionally, a **DATASET_NAME** parameter can be used to reference a training dataset that you have registered in your Azure ML workspace (more details below).

Make sure to select the **Allow access to all pipelines** checkbox in the
variable group configuration.

Expand Down Expand Up @@ -125,8 +129,7 @@ Check out the newly created resources in the [Azure Portal](https://portal.azure

(Optional) To remove the resources created for this project you can use the [/environment_setup/iac-remove-environment.yml](../environment_setup/iac-remove-environment.yml) definition or you can just delete the resource group in the [Azure Portal](https://portal.azure.com).

**Note:** The training ML pipeline uses a [sample diabetes dataset](https://scikit-learn.org/stable/modules/generated/sklearn.datasets.load_diabetes.html) as training data. If you want to use your own dataset, you need to [create and register a datastore](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-access-data#azure-machine-learning-studio) in your ML workspace and upload the datafile (e.g. [diabetes.csv](./data/diabetes.csv)) to the corresponding blob container. You can also define a datastore in the ML Workspace with [az cli](https://docs.microsoft.com/en-us/cli/azure/ext/azure-cli-ml/ml/datastore?view=azure-cli-latest#ext-azure-cli-ml-az-ml-datastore-attach-blob).
You'll also need to configure DATASTORE_NAME and DATAFILE_NAME variables in ***devopsforai-aml-vg*** variable group.
**Note:** The training ML pipeline uses a [sample diabetes dataset](https://scikit-learn.org/stable/modules/generated/sklearn.datasets.load_diabetes.html) as training data. To use your own data, you need to [create a Dataset](https://docs.microsoft.com/azure/machine-learning/how-to-create-register-datasets) in your workspace and specify its name in a DATASET_NAME variable in the ***devopsforai-aml-vg*** variable group. You will also need to modify the test cases in the **ml_service/util/smoke_test_scoring_service.py** script to match the schema of the training features in your dataset.

## Create an Azure DevOps Azure ML Workspace Service Connection

Expand Down
64 changes: 51 additions & 13 deletions ml_service/pipelines/diabetes_regression_build_train_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from azureml.pipeline.core.graph import PipelineParameter
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core import Pipeline
from azureml.pipeline.core import Pipeline, PipelineData
from azureml.core import Workspace, Environment
from azureml.core.runconfig import RunConfiguration
from azureml.core import Dataset, Datastore
from azureml.core import Dataset
from ml_service.util.attach_compute import get_compute
from ml_service.util.env_variables import Env
from sklearn.datasets import load_diabetes
import pandas as pd
import os


def main():
Expand Down Expand Up @@ -45,26 +48,59 @@ def main():
build_id_param = PipelineParameter(
name="build_id", default_value=e.build_id)

dataset_name = ""
if (e.datastore_name is not None and e.datafile_name is not None):
dataset_name = e.dataset_name
datastore = Datastore.get(aml_workspace, e.datastore_name)
data_path = [(datastore, e.datafile_name)]
dataset = Dataset.Tabular.from_delimited_files(path=data_path)
dataset.register(workspace=aml_workspace,
name=e.dataset_name,
description="dataset with training data",
create_new_version=True)
# Get dataset name
dataset_name = e.dataset_name

# Check to see if dataset exists
if (dataset_name not in aml_workspace.datasets):
# Create dataset from diabetes sample data
sample_data = load_diabetes()
df = pd.DataFrame(
data=sample_data.data,
columns=sample_data.feature_names)
df['Y'] = sample_data.target
file_name = 'diabetes.csv'
df.to_csv(file_name, index=False)

# Upload file to default datastore in workspace
default_ds = aml_workspace.get_default_datastore()
target_path = 'training-data/'
default_ds.upload_files(
files=[file_name],
target_path=target_path,
overwrite=True,
show_progress=False)

# Register dataset
path_on_datastore = os.path.join(target_path, file_name)
dataset = Dataset.Tabular.from_delimited_files(
path=(default_ds, path_on_datastore))
dataset = dataset.register(
workspace=aml_workspace,
name=dataset_name,
description='diabetes training data',
tags={'format': 'CSV'},
create_new_version=True)

# Get the dataset
dataset = Dataset.get_by_name(aml_workspace, dataset_name)

# Create a PipelineData to pass data between steps
pipeline_data = PipelineData(
'pipeline_data',
datastore=aml_workspace.get_default_datastore())

train_step = PythonScriptStep(
name="Train Model",
script_name=e.train_script_path,
compute_target=aml_compute,
source_directory=e.sources_directory_train,
inputs=[dataset.as_named_input('training_data')],
outputs=[pipeline_data],
arguments=[
"--build_id", build_id_param,
"--model_name", model_name_param,
"--dataset_name", dataset_name,
"--step_output", pipeline_data
],
runconfig=run_config,
allow_reuse=False,
Expand All @@ -91,9 +127,11 @@ def main():
script_name=e.register_script_path,
compute_target=aml_compute,
source_directory=e.sources_directory_train,
inputs=[pipeline_data],
arguments=[
"--build_id", build_id_param,
"--model_name", model_name_param,
"--step_input", pipeline_data,
],
runconfig=run_config,
allow_reuse=False,
Expand Down
Loading

0 comments on commit 88636df

Please sign in to comment.