Skip to content

Commit

Permalink
update exporter
Browse files Browse the repository at this point in the history
Signed-off-by: Sunyanan Choochotkaew <[email protected]>
  • Loading branch information
sunya-ch committed Sep 20, 2023
1 parent b6dad1e commit 26c0e3e
Show file tree
Hide file tree
Showing 9 changed files with 364 additions and 197 deletions.
14 changes: 8 additions & 6 deletions cmd/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,10 +407,12 @@ def train(args):
print_cols = ["feature_group", "model_name", "mae"]
print("AbsPower pipeline results:")
metadata_df = load_pipeline_metadata(pipeline.path, energy_source, ModelOutputType.AbsPower.name)
print(metadata_df.sort_values(by=[ERROR_KEY])[print_cols])
if metadata_df is not None:
print(metadata_df.sort_values(by=[ERROR_KEY])[print_cols])
print("DynPower pipeline results:")
metadata_df = load_pipeline_metadata(pipeline.path, energy_source, ModelOutputType.DynPower.name)
print(metadata_df.sort_values(by=[ERROR_KEY])[print_cols])
if metadata_df is not None:
print(metadata_df.sort_values(by=[ERROR_KEY])[print_cols])

warnings.resetwarnings()

Expand Down Expand Up @@ -616,7 +618,7 @@ def _summary_plot(energy_source, summary_df, output_folder, name):
sns.barplot(data=data, x="Feature Group", y="MAE", hue="Model", ax=ax)
ax.set_title(component)
ax.set_ylabel("MAE (Watt)")
ax.set_ylim((0, 50))
ax.set_ylim((0, 100))
if i < col_num-1:
ax.set_xlabel("")
ax.legend(bbox_to_anchor=(1.05, 1.05))
Expand Down Expand Up @@ -671,7 +673,8 @@ def plot(args):
from estimate import default_predicted_col_func
from sklearn.preprocessing import MaxAbsScaler

best_result_map, power_labels_map, best_model_id_map, _ = estimate(args)
best_result_map, power_labels_map, best_model_id_map, summary_df = estimate(args)
print(summary_df)
for energy_source, best_restult in best_result_map.items():
best_restult = best_restult.reset_index()
power_labels = power_labels_map[energy_source]
Expand Down Expand Up @@ -737,7 +740,7 @@ def export(args):
machine_path = get_machine_path(output_path, args.version, machine_id)

collect_date, _ = extract_time(args.benchmark)
exporter.export(pipeline_path, machine_path, version=args.version, publisher=args.publisher, collect_date=collect_date, include_raw=args.include_raw)
exporter.export(data_path, pipeline_path, machine_path, machine_id=machine_id, version=args.version, publisher=args.publisher, collect_date=collect_date, include_raw=args.include_raw)

args.energy_source = ",".join(PowerSourceMap.keys())

Expand Down Expand Up @@ -839,7 +842,6 @@ def plot_scenario(args):
data_filename = get_general_filename(args.target_data, energy_source, None, ot, args.extractor, args.isolator) + "_" + args.scenario
_ts_plot(power_data, power_cols, "Power source: {} ({})".format(energy_source, args.scenario), output_folder, data_filename, ylabel="Power (W)")


if __name__ == "__main__":
# set model top path to data path
os.environ['MODEL_PATH'] = data_path
Expand Down
321 changes: 146 additions & 175 deletions model_training/benchmark/stressng.yaml

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion model_training/script.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export VERSION=${VERSION-v0.6}
export PIPELINE_PREFIX=${PIPELINE_PREFIX-"std_"}
export CPE_DATAPATH=${CPE_DATAPATH-"$(pwd)/data"}
export ENTRYPOINT_IMG=${ENTRYPOINT_IMG-"quay.io/sustainable_computing_io/kepler_model_server:v0.6"}
export MODEL_PATH=$CPE_DATAPATH

mkdir -p $HOME/bin
export PATH=$HOME/bin:$PATH
Expand Down Expand Up @@ -180,7 +181,7 @@ function quick_collect() {
}

function train() {
train_model stressng_kepler_query,coremark_kepler_query,parsec_kepler_query ${VERSION}
train_model stressng_kepler_query ${VERSION}
}

function quick_train() {
Expand Down
20 changes: 17 additions & 3 deletions src/train/exporter/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
from loader import load_csv, load_pipeline_metadata, get_model_group_path, load_metadata, load_train_args, get_preprocess_folder, get_general_filename
from saver import WEIGHT_FILENAME, save_pipeline_metadata, save_train_args
from format import time_to_str
from writer import generate_pipeline_page, generate_validation_results, append_version_readme

def export(pipeline_path, machine_path, version, publisher, collect_date, include_raw=False):
def export(data_path, pipeline_path, machine_path, machine_id, version, publisher, collect_date, include_raw=False):
if not validate_arguments(pipeline_path):
return

Expand Down Expand Up @@ -47,7 +48,9 @@ def export(pipeline_path, machine_path, version, publisher, collect_date, includ

extractor = pipeline_metadata["extractor"]
isolator = pipeline_metadata["isolator"]
mae_validated_df_map = dict()
for energy_source in PowerSourceMap.keys():
mae_validated_df_map[energy_source] = dict()
for ot in ModelOutputType:
metadata_df = load_pipeline_metadata(pipeline_path, energy_source, ot.name)
if metadata_df is None:
Expand Down Expand Up @@ -80,8 +83,19 @@ def export(pipeline_path, machine_path, version, publisher, collect_date, includ
save_pipeline_metadata(out_pipeline_path, pipeline_metadata, energy_source, ot.name, mae_validated_df)
print("Exported models for {}/{}".format(energy_source, ot.name))
print(mae_validated_df)
mae_validated_df_map[energy_source][ot.name] = mae_validated_df
else:
print("No valid models exported for {}/{}".format(energy_source, ot.name))


train_args = load_train_args(pipeline_path)
train_args["machine_id"] = machine_id

# save train args
save_train_args(out_pipeline_path, load_train_args(pipeline_path))
save_train_args(out_pipeline_path, train_args)

# generate document
generate_pipeline_page(data_path, machine_path, train_args)
generate_validation_results(machine_path, train_args, mae_validated_df_map)
append_version_readme(machine_path, train_args, pipeline_metadata, include_raw)


1 change: 0 additions & 1 deletion src/train/exporter/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

from loader import load_train_args
from config import ERROR_KEY
from train_types import PowerSourceMap

required_benchmark = ["stressng_kepler_query"]

Expand Down
166 changes: 166 additions & 0 deletions src/train/exporter/writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import os
import sys

import pandas as pd

util_path = os.path.join(os.path.dirname(__file__), '..', '..', 'util')
sys.path.append(util_path)

from loader import load_json, get_machine_path, default_init_model_url, get_url, trainers_with_weight
from config import ERROR_KEY
from train_types import ModelOutputType, FeatureGroup

def write_markdown(markdown_filepath, markdown_content):

try:
with open(markdown_filepath, "w", encoding="utf-8") as markdown_file:
# Write the Markdown content to the file
markdown_file.write(markdown_content)
print(f"Markdown file '{markdown_filepath}' has been created successfully.")
except IOError as e:
print(f"Cannot write '{markdown_filepath}': {e}")


# Function to convert a dictionary to a Markdown table
def dict_to_markdown_table(data):
# Get the column headers
headers = list(data.keys())

# Initialize the Markdown table with headers
markdown_table = "| " + " | ".join(headers) + " |\n"
markdown_table += "| " + " | ".join(["---"] * len(headers)) + " |\n"

# Iterate through the dictionary and add rows to the table
for i in range(len(data[headers[0]])):
row = "| " + " | ".join([str(data[key][i]) for key in headers]) + " |\n"
markdown_table += row

return markdown_table

def format_cpe_content(data):
spec = data["spec"]
iterations = spec["iterationSpec"]["iterations"]
items = dict()
for iteration in iterations:
items[iteration["name"]] = iteration["values"]
df = pd.DataFrame(items)
content = dict_to_markdown_table(df)
content += "\nrepetition: {}".format(spec["repetition"])
return content

def format_trainer(trainers):
trainer_content = ""
for trainer in trainers.split(","):
trainer_content += " - {}\n".format(trainer)
return trainer_content

def _version_path(machine_path):
return os.path.join(machine_path, "..")

def generate_pipeline_page(data_path, machine_path, train_args, skip_if_exist=True):
doc_path = os.path.join(_version_path(machine_path), ".doc")
pipeline_name = train_args["pipeline_name"]
markdown_filename = "{}.md".format(pipeline_name)
markdown_filepath = os.path.join(doc_path, markdown_filename)
if skip_if_exist and os.path.exists(markdown_filepath):
print(f"Markdown file '{markdown_filepath}' already exists.")
return

workload_content = ""
inputs = train_args["input"].split(",")
for input in inputs:
benchmark_name = "".join(input.split("_")[0:-2])
data = load_json(data_path, benchmark_name)

workload_content += """
### {}
<-- put workload description here -->
<details>
{}
</details>
""".format(benchmark_name, format_cpe_content(data))


markdown_content = """
# Pipeline {}
## Description
<-- put pipeline description here -->
## Components
- **Extractor:** {}
- **Isolator:** {}
- **AbsPower Trainers:**
{}
- **DynPower Trainers:**
{}
## Workload information
{}
""".format(pipeline_name, train_args["extractor"], train_args["isolator"], format_trainer(train_args["abs_trainers"]), " (same as AbsPower Trainers)" if train_args["abs_trainers"] == train_args["dyn_trainers"] else train_args["dyn_trainers"], workload_content)

write_markdown(markdown_filepath, markdown_content)

def model_url(version, machine_id, pipeline_name, energy_source, output_type, feature_group, model_name, weight):
machine_path = get_machine_path(default_init_model_url, version, machine_id, assure=False)
model_url = get_url(ModelOutputType[output_type], FeatureGroup[feature_group], model_name=model_name, model_topurl=machine_path, energy_source=energy_source, pipeline_name=pipeline_name, weight=weight)
return model_url

def format_error_content(train_args, mae_validated_df_map, weight):
content = ""
for energy_source, mae_validated_df_outputs in mae_validated_df_map.items():
for output_type, mae_validated_df in mae_validated_df_outputs.items():
content += "### {} {} model\n\n".format(energy_source, output_type)
df = mae_validated_df
if weight:
df = mae_validated_df[mae_validated_df["model_name"].str.contains('|'.join(trainers_with_weight))]
items = []
min_err_rows = df.loc[df.groupby(["feature_group"])[ERROR_KEY].idxmin()]
for _, row in min_err_rows.iterrows():
item = dict()
feature_group = row["feature_group"]
model_name = row["model_name"]
item["url"] = model_url(train_args["version"], train_args["machine_id"], train_args["pipeline_name"], energy_source, output_type, feature_group, model_name, weight)
item[ERROR_KEY] = "{:.2f}".format(row[ERROR_KEY])
item["feature group"] = feature_group
item["model name"] = model_name
items += [item]
print_df = pd.DataFrame(items, columns=["feature group", "model name", ERROR_KEY, "url"])
content += dict_to_markdown_table(print_df.sort_values(by=["feature group"]))
return content

def generate_validation_results(machine_path, train_args, mae_validated_df_map):
markdown_filepath = os.path.join(machine_path, "README.md")

markdown_content = "# Validation results\n\n"
markdown_content += "## With local estimator\n\n"
markdown_content += format_error_content(train_args, mae_validated_df_map, weight=True)
markdown_content += "## With sidecar estimator\n\n"
markdown_content += format_error_content(train_args, mae_validated_df_map, weight=False)
write_markdown(markdown_filepath, markdown_content)

def append_version_readme(machine_path, train_args, pipeline_metadata, include_raw):
readme_path = os.path.join(_version_path(machine_path), "README.md")

content_to_append = "{0}|[{1}](./.doc/{1}.md)|{2}|{3}|{4}|[{5}](https://github.com/{5})|[link](./{6}/README.md)\n".format(train_args["machine_id"], \
train_args["pipeline_name"], \
"&check;" if include_raw else "X", \
pipeline_metadata["collect_time"], \
pipeline_metadata["last_update_time"], \
pipeline_metadata["publisher"],\
train_args["machine_id"]\
)

with open(readme_path, 'a') as file:
file.write(content_to_append)
7 changes: 5 additions & 2 deletions src/train/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def save_metadata(self):
all_metadata = get_all_metadata(model_toppath, self.name)
for energy_source, model_type_metadata in all_metadata.items():
for model_type, metadata_df in model_type_metadata.items():
metadata_df = metadata_df.sort_values(by=[ERROR_KEY])
metadata_df = metadata_df.sort_values(by=["feature_group", ERROR_KEY])
save_pipeline_metadata(self.path, self.metadata, energy_source, model_type, metadata_df)

def print_pipeline_process_end(self, energy_source, feature_group, abs_data, dyn_data):
Expand Down Expand Up @@ -214,7 +214,10 @@ def initial_trainers(trainer_names, node_level, pipeline_name, target_energy_sou
energy_components = PowerSourceMap[energy_source]
for feature_group in valid_feature_groups:
for trainer_name in trainer_names:
trainer_class = load_class("trainer", trainer_name)
try:
trainer_class = load_class("trainer", trainer_name)
except:
continue
trainer = trainer_class(energy_components, feature_group.name, energy_source, node_level, pipeline_name=pipeline_name)
trainers += [trainer]
return trainers
Expand Down
27 changes: 19 additions & 8 deletions src/util/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
any_node_type = -1
default_feature_group = FeatureGroup.KubeletOnly

trainers_with_weight = ["SGDRegressorTrainer"]

def load_json(path, name):
if ".json" not in name:
name = name + ".json"
Expand Down Expand Up @@ -236,20 +238,29 @@ def get_download_output_path(download_path, energy_source, output_type):
energy_source_path = assure_path(os.path.join(download_path, energy_source))
return os.path.join(energy_source_path, output_type.name)

def get_url(output_type, feature_group=default_feature_group, trainer_name=default_trainer_name, node_type=default_node_type, model_topurl=default_init_model_url, energy_source="rapl", pipeline_name=default_init_pipeline_name):
def get_url(output_type, feature_group=default_feature_group, trainer_name=default_trainer_name, node_type=default_node_type, model_topurl=default_init_model_url, energy_source="rapl", pipeline_name=default_init_pipeline_name, model_name=None, weight=False):
group_path = get_model_group_path(model_topurl, output_type=output_type, feature_group=feature_group, energy_source=energy_source, pipeline_name=pipeline_name, assure=False)
model_name = get_model_name(trainer_name, node_type)
return os.path.join(group_path, model_name + ".zip")

def get_pipeline_url(model_topurl=default_init_model_url, pipeline_name=default_init_pipeline_name):
return os.path.join(model_topurl, pipeline_name + ".zip")
if model_name is None:
model_name = get_model_name(trainer_name, node_type)
file_ext = ".zip"
if weight:
file_ext = ".json"
return os.path.join(group_path, model_name + file_ext)

def get_pipeline_url(model_topurl=default_init_model_url, pipeline_name=default_init_pipeline_name, weight=False):
file_ext = ".zip"
if weight:
file_ext = ".json"
return os.path.join(model_topurl, pipeline_name + file_ext)

def class_to_json(class_obj):
return json.loads(json.dumps(class_obj.__dict__))

def get_machine_path(output_path, version, machine_id):
def get_machine_path(output_path, version, machine_id, assure=True):
export_path = os.path.join(output_path, version, machine_id)
return assure_path(export_path)
if assure:
return assure_path(export_path)
return export_path

def get_preprocess_folder(pipeline_path, assure=True):
preprocess_folder = os.path.join(pipeline_path, PREPROCESS_FOLDERNAME)
Expand Down
2 changes: 1 addition & 1 deletion src/util/train_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def deep_sort(elements):
FeatureGroup.AcceleratorOnly: deep_sort(ACCELERATE_FEATURES),
}

SingleSourceFeatures = [FeatureGroup.CounterOnly.name, FeatureGroup.CgroupOnly.name, FeatureGroup.BPFOnly.name, FeatureGroup.KubeletOnly.name]
SingleSourceFeatures = [FeatureGroup.CounterOnly.name, FeatureGroup.CgroupOnly.name, FeatureGroup.BPFOnly.name, FeatureGroup.BPFIRQ.name, FeatureGroup.KubeletOnly.name]

def is_single_source_feature_group(fg):
return fg.name in SingleSourceFeatures
Expand Down

0 comments on commit 26c0e3e

Please sign in to comment.