Skip to content

Commit

Permalink
Merge pull request #215 from sunya-ch/trainer
Browse files Browse the repository at this point in the history
Add curvefit trainer (fix minor issue, update test data)
  • Loading branch information
rootfs authored Jan 17, 2024
2 parents a5b54df + 854eb50 commit 055f537
Show file tree
Hide file tree
Showing 23 changed files with 355 additions and 45 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -140,5 +140,8 @@ tests/download/*
*/*/*/.DS_Store

src/models
tests/data/*_output
tests/data/extractor_output
tests/data/isolator_output
tests/data/offline_trainer_output
tests/data/plot_output
model_training/*data*
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ OPTS="ESTIMATOR SERVER" make deploy
```

## Model training
- [Use Tekton pipeline](./tekton)
- [Use Tekton pipeline](./model_training/tekton/)
- [Use Bash script with CPE operator](./model_training/)

## Local test
Expand Down
19 changes: 19 additions & 0 deletions src/abs-train-pipelinerun.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
apiVersion: tekton.dev/v1
kind: PipelineRun
metadata:
name: example-abs-train-pipeline
spec:
timeouts:
pipeline: "6h"
tasks: "5h50m"
workspaces:
- name: mnt
persistentVolumeClaim:
claimName: task-pvc
params:
- name: PIPELINE_NAME
value: AbsPowerTrainPipelineExample
- name: OUTPUT_TYPE
value: AbsPower
pipelineRef:
name: single-train-pipeline
56 changes: 56 additions & 0 deletions src/estimate/model/curvefit_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import os
import sys
cur_path = os.path.join(os.path.dirname(__file__), '.')
sys.path.append(cur_path)

from estimate_common import transform_and_predict, load_model_by_pickle, load_model_by_json, is_component_model
from util.train_types import main_feature, get_valid_feature_groups

import os
import sys
src_path = os.path.join(os.path.dirname(__file__), '..', '..')
sys.path.append(src_path)

from util import ModelOutputType

import collections.abc

class CurveFitModel():
def __init__(self, model_path, model_name, output_type, model_file, features, fe_files, component_init=False, feature_group=None):
self.name = model_name
self.features = features
if feature_group is None:
self.feauture_group = get_valid_feature_groups(features)[0]
else:
self.feauture_group = feature_group
self.output_type = ModelOutputType[output_type]

self.comp_type = not component_init and is_component_model(model_file)
if self.comp_type:
self.models = dict()
model_info = load_model_by_json(model_path, model_file)
for comp, model_metadata in model_info.items():
model = CurveFitModel(model_path, self.name, self.output_type.name, model_metadata['model_file'], model_metadata['features'], model_metadata['fe_files'], component_init=True)
feature_index = main_feature(self.feauture_group.name, comp)
model.model.set_feature_index(feature_index)
self.models[comp] = model
else:
self.model = load_model_by_pickle(model_path, model_file)
self.fe_list = []
for fe_filename in fe_files:
self.fe_list += [load_model_by_pickle(model_path, fe_filename)]

def get_power(self, request):
if self.comp_type:
results = dict()
for comp, model in self.models.items():
y, msg = transform_and_predict(model, request)
if msg != "":
return [], msg
if not isinstance(y, collections.abc.Sequence):
y = [y]
results[comp] = y
return results, msg
else:
return transform_and_predict(self, request)

4 changes: 3 additions & 1 deletion src/estimate/model/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@

from scikit_model import ScikitModel
from xgboost_model import XgboostModel
from curvefit_model import CurveFitModel
# from keras_model import KerasModel

# model wrapper
MODELCLASS = {
'scikit': ScikitModel,
'xgboost': XgboostModel
'xgboost': XgboostModel,
'curvefit': CurveFitModel
# 'keras': KerasModel,
}

Expand Down
5 changes: 4 additions & 1 deletion src/train/isolator/train_isolator.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ def get_abs_models(workload_feature_cols, energy_source, toppath=model_toppath,
for model_name in model_names:
model_path = os.path.join(group_path, model_name)
model = load_model(model_path)
abs_models += [model]
if model is not None:
abs_models += [model]
else:
print("Model is none: ", model_path)
return abs_models

# extracted_power_labels: sum of power labels over sorted timestamp for each energy_components
Expand Down
13 changes: 3 additions & 10 deletions src/train/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,22 +53,15 @@ def __init__(self, name, trainers, extractor, isolator):
self.metadata["init_time"] = time_to_str(datetime.datetime.utcnow())

def get_abs_data(self, query_results, energy_components, feature_group, energy_source, aggr):
extracted_data, power_labels, _, features = self.extractor.extract(query_results, energy_components, feature_group, energy_source, node_level=True, aggr=aggr)
self.process_accelerator_feature(features)
extracted_data, power_labels, _, _ = self.extractor.extract(query_results, energy_components, feature_group, energy_source, node_level=True, aggr=aggr)
return extracted_data, power_labels

def get_dyn_data(self, query_results, energy_components, feature_group, energy_source):
extracted_data, power_labels, _, features = self.extractor.extract(query_results, energy_components, feature_group, energy_source, node_level=False)
extracted_data, power_labels, _, _ = self.extractor.extract(query_results, energy_components, feature_group, energy_source, node_level=False)
if extracted_data is None or power_labels is None:
return None
self.process_accelerator_feature(features)
isolated_data = self.isolator.isolate(extracted_data, label_cols=power_labels, energy_source=energy_source)
return isolated_data

def process_accelerator_feature(self, features):
if features is not None and len(features) != 0:
for trainer in self.trainers:
trainer.features = features
return isolated_data

def prepare_data(self, input_query_results, energy_components, energy_source, feature_group, aggr=True):
query_results = input_query_results.copy()
Expand Down
21 changes: 21 additions & 0 deletions src/train/trainer/ExponentialRegressionTrainer/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import os
import sys
trainer_path = os.path.join(os.path.dirname(__file__), '..')
sys.path.append(trainer_path)

from trainer.curvefit import CurveFitTrainer, CurveFitModel

import numpy as np

def expo_func(x, a, b, c):
y = a*np.exp(b*x) + c
return y

class ExponentialRegressionTrainer(CurveFitTrainer):

def __init__(self, energy_components, feature_group, energy_source, node_level, pipeline_name):
super(ExponentialRegressionTrainer, self).__init__(energy_components, feature_group, energy_source, node_level, pipeline_name=pipeline_name)
self.fe_files = []

def init_model(self):
return CurveFitModel(expo_func)
28 changes: 28 additions & 0 deletions src/train/trainer/LogarithmicRegressionTrainer/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import os
import sys
trainer_path = os.path.join(os.path.dirname(__file__), '..')
sys.path.append(trainer_path)

from trainer.curvefit import CurveFitTrainer, CurveFitModel

import numpy as np

def p0_func(x, y):
print(y.max(), y.min())
a = y.max()-y.min()
b = 1
c = y.min()
return [a, b, c]

def log_func(x, a, b, c):
y = [a * np.log(b*xi) + c if b*xi > 0 and a * np.log(b*xi) > 0 else c for xi in x]
return y

class LogarithmicRegressionTrainer(CurveFitTrainer):

def __init__(self, energy_components, feature_group, energy_source, node_level, pipeline_name):
super(LogarithmicRegressionTrainer, self).__init__(energy_components, feature_group, energy_source, node_level, pipeline_name=pipeline_name)
self.fe_files = []

def init_model(self):
return CurveFitModel(log_func, p0_func=p0_func)
27 changes: 27 additions & 0 deletions src/train/trainer/LogisticRegressionTrainer/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import os
import sys
trainer_path = os.path.join(os.path.dirname(__file__), '..')
sys.path.append(trainer_path)

from trainer.curvefit import CurveFitTrainer, CurveFitModel

import numpy as np

def p0_func(x, y):
A = y.max() - y.min() # value range
x0 = 0.5 # sigmoid mid point (as normalized value is in 0 to 1, start mid point = 0.5)
k = A/np.std(y) # growth rate (larger std, lower growth)
off = y.min() # initial offset
return [A,x0,k,off]

def logi_func(x, A, x0, k, off):
return A / (1 + np.exp(-k*(x-x0)))+off

class LogisticRegressionTrainer(CurveFitTrainer):

def __init__(self, energy_components, feature_group, energy_source, node_level, pipeline_name):
super(LogisticRegressionTrainer, self).__init__(energy_components, feature_group, energy_source, node_level, pipeline_name=pipeline_name)
self.fe_files = []

def init_model(self):
return CurveFitModel(logi_func, p0_func=p0_func)
5 changes: 4 additions & 1 deletion src/train/trainer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from util import assure_path, ModelOutputType, FeatureGroups, FeatureGroup,save_json, save_metadata, load_metadata, save_scaler, save_weight

from util.prom_types import node_info_column
from util.train_types import main_feature
from util.extract_types import component_to_col, get_unit_vals, ratio_to_col
from util.loader import get_model_group_path, get_save_path, get_model_name, get_archived_file, CHECKPOINT_FOLDERNAME, load_scaler
from util.config import model_toppath
Expand Down Expand Up @@ -39,7 +40,6 @@ def __init__(self, model_class, energy_components, feature_group, energy_source,
self.features = FeatureGroups[self.feature_group]
self.energy_source = energy_source
self.node_level = node_level

self.trainer_name = self.__class__.__name__
self.model_class = model_class
self.output_type = ModelOutputType.AbsPower if node_level else ModelOutputType.DynPower
Expand Down Expand Up @@ -124,6 +124,9 @@ def load_model(self, node_type):
# init if failed to load any checkpoint
self.node_models[node_type][component] = self.init_model()
self.print_log("Newly initialize model ({})".format(component))
if hasattr(self.node_models[node_type][component], "set_feature_index"):
feature_index = main_feature(self.feature_group_name, component)
self.node_models[node_type][component].set_feature_index(feature_index)

def process(self, data, power_labels, pipeline_lock):
node_types = pd.unique(data[node_info_column])
Expand Down
135 changes: 135 additions & 0 deletions src/train/trainer/curvefit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
from sklearn.metrics import mean_absolute_error
from sklearn.exceptions import NotFittedError
import numpy as np
from scipy.optimize import curve_fit
import os
import sys

util_path = os.path.join(os.path.dirname(__file__), '..', '..', 'util')
sys.path.append(util_path)

from util import save_pkl, load_pkl
from util.train_types import main_feature, FeatureGroup

from . import Trainer

model_class = "curvefit"

def get_save_path(model_filepath):
return "/".join(model_filepath.split("/")[0:-1])

class CurveFitModel():
def __init__(self, fit_func, p0_func=None):
self.fit_func = fit_func
self.popt = None
self.pcov = None
self.feature_index = None
self.p0_func = p0_func

def set_feature_index(self, feature_index):
self.feature_index = feature_index

def _x_values(self, X_values):
return np.array(X_values[:,self.feature_index]).flatten()

def fit(self, X_values, y_values):
flatten_x = self._x_values(X_values)
flatten_y = np.array(y_values).flatten()
if self.p0_func is not None:
self.popt, self.pcov = curve_fit(self.fit_func, flatten_x, flatten_y, p0=self.p0_func(flatten_x, flatten_y), maxfev=20000)
else:
self.popt, self.pcov = curve_fit(self.fit_func, flatten_x, flatten_y, maxfev=20000)

def predict(self, X_values):
if self.popt is None:
raise NotFittedError("Model must be fit first")
flatten_x = self._x_values(X_values)
return np.array(self.fit_func(flatten_x, *self.popt))

# curvefit will focus on only single feature. default is the first feature in the feature group.
class CurveFitTrainer(Trainer):
def __init__(self, energy_components, feature_group, energy_source, node_level, pipeline_name, scaler_type="maxabs"):
super(CurveFitTrainer, self).__init__(model_class, energy_components, feature_group, energy_source, node_level, pipeline_name, scaler_type=scaler_type)
self.fe_files = []

def train(self, node_type, component, X_values, y_values):
try:
if hasattr(self, 'fe'):
for index in range(len(self.fe)):
X_values = self.fe[index].fit_transform(X_values)
model = self.node_models[node_type][component]
if component == "package":
dram_index = main_feature(self.feature_group_name, "dram")
if model.feature_index != dram_index:
dram_values = np.array(X_values[:,dram_index]).flatten()
zero_dram_indices = [i for i in dram_values if i < 0.1]
X_values = [list(row) for i, row in enumerate(X_values) if i not in zero_dram_indices]
y_values = [row for i, row in enumerate(y_values) if i not in zero_dram_indices]
X_values = np.array(X_values)
model.fit(X_values, y_values)
except Exception as err:
print("Train error", err)
import traceback
traceback.print_exc()

def save_checkpoint(self, model, filepath):
if hasattr(self, 'fe'):
save_path = get_save_path(filepath)
for index in range(len(self.fe)):
save_pkl(save_path, self.fe_files[index], self.fe[index])
save_pkl("", filepath, model)

def load_local_checkpoint(self, filepath):
if hasattr(self, 'fe_files'):
save_path = get_save_path(filepath)
for index in range(len(self.fe_files)):
loaded_fe = load_pkl(save_path, self.fe_files[index])
if loaded_fe is not None:
self.fe[index] = loaded_fe
loaded_model = load_pkl("", filepath)
return loaded_model, loaded_model is not None

def should_archive(self, node_type):
return True

def get_basic_metadata(self, node_type):
return dict()

def get_mae(self, node_type, component, X_test, y_test):
predicted_values = self.predict(node_type, component, X_test, skip_preprocess=True)
mae = mean_absolute_error(y_test, predicted_values)
return mae

def get_mape(self, node_type, component, X_test, y_test):
y_test = list(y_test)
predicted_values = self.predict(node_type, component, X_test, skip_preprocess=True)
non_zero_predicted_values = np.array([predicted_values[i] for i in range(len(predicted_values)) if y_test[i] > 0])
if len(non_zero_predicted_values) == 0:
return -1
non_zero_y_test = np.array([y for y in y_test if y > 0])
absolute_percentage_errors = np.abs((non_zero_y_test - non_zero_predicted_values) / non_zero_y_test) * 100
mape = np.mean(absolute_percentage_errors)
return mape

def save_model(self, component_save_path, node_type, component):
model = self.node_models[node_type][component]
filepath = os.path.join(component_save_path, component)
self.save_checkpoint(model, filepath)

def component_model_filename(self, component):
return component + ".pkl"

def get_weight_dict(self, node_type):
weight_dict = dict()

for component, model in self.node_models[node_type].items():
scaler = self.node_scalers[node_type]
weight_dict[component] = {
"All_Weights": {
"Categorical_Variables": dict(),
"Numerical_Variables": {self.features[i]:
{"scale": scaler.scale_[i]} for i in range(len(self.features))},
"CurveFit_Weights": list(model.popt)
}
}
return weight_dict
Loading

0 comments on commit 055f537

Please sign in to comment.