Skip to content

Commit

Permalink
Support third party metrics in model server
Browse files Browse the repository at this point in the history
End user may have some specific metrics that are not included
in kepler, so this patch enhance the model server to collect
third party metrics

Signed-off-by: LeiZhou-97 <[email protected]>
  • Loading branch information
LeiZhou-97 committed Nov 6, 2023
1 parent 9f670d2 commit d74879b
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 33 deletions.
71 changes: 42 additions & 29 deletions cmd/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
src_path = os.path.join(os.path.dirname(__file__), '..', 'src')
sys.path.append(src_path)

from util.prom_types import PROM_SERVER, PROM_QUERY_INTERVAL, PROM_QUERY_STEP, PROM_HEADERS, PROM_SSL_DISABLE
from util.prom_types import metric_prefix as KEPLER_METRIC_PREFIX, node_info_column, prom_responses_to_results, TIMESTAMP_COL, feature_to_query
from util.prom_types import PROM_SERVER, PROM_QUERY_INTERVAL, PROM_QUERY_STEP, PROM_HEADERS, PROM_SSL_DISABLE, PROM_THIRDPARTY_METRICS
from util.prom_types import metric_prefix as KEPLER_METRIC_PREFIX, node_info_column, prom_responses_to_results, TIMESTAMP_COL, feature_to_query, update_thirdparty_metrics
from util.train_types import ModelOutputType, FeatureGroup, FeatureGroups, is_single_source_feature_group, SingleSourceFeatures
from util.loader import load_json, DEFAULT_PIPELINE, load_pipeline_metadata, get_pipeline_path, get_model_group_path, list_pipelines, list_model_names, load_metadata, load_csv, get_machine_path, get_preprocess_folder, get_general_filename
from util.saver import save_json, save_csv, save_train_args
Expand Down Expand Up @@ -56,7 +56,7 @@ def summary_validation(validate_df):
items = []
metric_to_validate_pod = {
"cgroup": "kepler_container_cgroupfs_cpu_usage_us_total",
# "hwc": "kepler_container_cpu_instructions_total",
# "hwc": "kepler_container_cpu_instructions_total",
"hwc": "kepler_container_cpu_cycles_total",
"kubelet": "kepler_container_kubelet_cpu_usage_total",
"bpf": "kepler_container_bpf_cpu_time_us_total",
Expand Down Expand Up @@ -214,7 +214,15 @@ def query(args):
print("Query from {}.".format(benchmark_filename))
start, end = extract_time(benchmark_filename)
available_metrics = prom.all_metrics()
queries = [m for m in available_metrics if args.metric_prefix in m]

queries = None
if args.thirdparty_metrics != "":
queries = [m for m in available_metrics if args.metric_prefix in m or m in args.thirdparty_metrics]
elif PROM_THIRDPARTY_METRICS != [""]:
queries = [m for m in available_metrics if args.metric_prefix in m or m in PROM_THIRDPARTY_METRICS]
else:
queries = [m for m in available_metrics if args.metric_prefix in m]

print("Start {} End {}".format(start, end))
response = _range_queries(prom, queries, start, end, args.step, None)
save_json(path=data_path, name=args.output, data=response)
Expand All @@ -235,7 +243,7 @@ def validate(args):
summary_validation(validate_df)
if args.output:
save_csv(path=data_path, name=args.output, data=validate_df)

def assert_train(trainer, data, energy_components):
import pandas as pd
node_types = pd.unique(data[node_info_column])
Expand All @@ -260,7 +268,7 @@ def get_isolator(isolator, profile, pipeline_name, target_hints, bg_hints):
target_hints = target_hints.split(",")
else:
target_hints = []

if bg_hints:
bg_hints = bg_hints.split(",")
else:
Expand All @@ -280,12 +288,12 @@ def get_isolator(isolator, profile, pipeline_name, target_hints, bg_hints):
supported_isolator[profile_isolator.get_name()] = profile_isolator
else:
trainer_isolator = TrainIsolator(target_hints=target_hints, bg_hints=bg_hints, abs_pipeline_name=pipeline_name)

supported_isolator[trainer_isolator.get_name()] = trainer_isolator

if isolator not in supported_isolator:
print("isolator {} is not supported. supported isolator: {}".format(isolator, supported_isolator.keys()))
return None
return None
return supported_isolator[isolator]

def get_extractor(extractor):
Expand All @@ -312,7 +320,11 @@ def extract(args):
input = args.input
response = load_json(data_path, input)
query_results = prom_responses_to_results(response)

# Inject thirdparty_metrics to FeatureGroup
if args.thirdparty_metrics != "":
update_thirdparty_metrics(args.thirdparty_metrics)
elif PROM_THIRDPARTY_METRICS != [""]:
update_thirdparty_metrics(PROM_THIRDPARTY_METRICS)
valid_fg = get_valid_feature_group_from_queries([query for query in query_results.keys() if len(query_results[query]) > 1 ])
ot, fg = check_ot_fg(args, valid_fg)
if fg is None or ot is None:
Expand Down Expand Up @@ -352,7 +364,7 @@ def train(args):

pipeline_name = DEFAULT_PIPELINE
if args.pipeline_name:
pipeline_name = args.pipeline_name
pipeline_name = args.pipeline_name

inputs = args.input.split(",")
energy_sources = args.energy_source.split(",")
Expand All @@ -370,7 +382,7 @@ def train(args):
valid_feature_groups = list(set(valid_feature_groups).intersection(set(valid_fg)))
input_query_results_list += [query_results]


abs_trainer_names = args.abs_trainers.split(",")
dyn_trainer_names = args.dyn_trainers.split(",")
pipeline = get_pipeline(pipeline_name, args.extractor, args.profile, args.target_hints, args.bg_hints, args.isolator, abs_trainer_names, dyn_trainer_names, energy_sources, valid_feature_groups)
Expand All @@ -381,7 +393,7 @@ def train(args):
energy_components = PowerSourceMap[energy_source]
for feature_group in valid_feature_groups:
success, abs_data, dyn_data = pipeline.process_multiple_query(input_query_results_list, energy_components, energy_source, feature_group=feature_group.name)
assert success, "failed to process pipeline {}".format(pipeline.name)
assert success, "failed to process pipeline {}".format(pipeline.name)
for trainer in pipeline.trainers:
if trainer.feature_group == feature_group and trainer.energy_source == energy_source:
if trainer.node_level and abs_data is not None:
Expand All @@ -397,7 +409,7 @@ def train(args):


print("=========== Train {} Summary ============".format(energy_source))
# save args
# save args
argparse_dict = vars(args)
save_train_args(pipeline.path, argparse_dict)
print("Train args:", argparse_dict)
Expand Down Expand Up @@ -431,10 +443,10 @@ def estimate(args):
response = load_json(data_path, input)
query_results = prom_responses_to_results(response)
input_query_results_list += [query_results]

valid_fg = get_valid_feature_group_from_queries([query for query in query_results.keys() if len(query_results[query]) > 1 ])
ot, fg = check_ot_fg(args, valid_fg)
if fg is not None:
if fg is not None:
valid_fg = [fg]

best_result_map = dict()
Expand Down Expand Up @@ -526,7 +538,7 @@ def estimate(args):
path_splits = best_model_path.split("/")
best_model_id_map[energy_source] = "{} using {}".format(path_splits[-1], path_splits[-2])
return best_result_map, power_labels_map, best_model_id_map, pd.DataFrame(summary_items)

def _ts_plot(data, cols, title, output_folder, name, labels=None, subtitles=None, ylabel=None):
plot_height = 3
plot_width = 10
Expand Down Expand Up @@ -558,7 +570,7 @@ def _ts_plot(data, cols, title, output_folder, name, labels=None, subtitles=None
def _feature_power_plot(data, model_id, output_type, energy_source, feature_cols, actual_power_cols, predicted_power_cols, output_folder, name):
plot_height = 5
plot_width = 5

import matplotlib.pyplot as plt
import seaborn as sns
sns.set(font_scale=1.2)
Expand Down Expand Up @@ -597,10 +609,10 @@ def _summary_plot(energy_source, summary_df, output_folder, name):
if len(summary_df) == 0:
print("no summary data to plot")
return

plot_height = 3
plot_width = 20

import matplotlib.pyplot as plt
import seaborn as sns
sns.set(font_scale=1.2)
Expand Down Expand Up @@ -637,7 +649,7 @@ def plot(args):
exit()
valid_fg = [fg_key for fg_key in FeatureGroups.keys()]
ot, fg = check_ot_fg(args, valid_fg)
if fg is not None:
if fg is not None:
valid_fg = [fg]
print("Plot:", args)
energy_sources = args.energy_source.split(",")
Expand All @@ -662,7 +674,7 @@ def plot(args):
print("cannot load data from {}/{}".format(data_saved_path, data_filename))
continue
feature_plot += [fg.name]
feature_cols = FeatureGroups[fg]
feature_cols = FeatureGroups[fg]
power_cols = [col for col in data.columns if "power" in col]
feature_data = data.groupby([TIMESTAMP_COL]).sum()
_ts_plot(feature_data[feature_data[feature_cols]>0], feature_cols, "Feature group: {}".format(fg.name), output_folder, data_filename)
Expand Down Expand Up @@ -725,7 +737,7 @@ def export(args):
if args.output == default_output_filename:
print("need to specify --output for /models path")
exit()
output_path = args.output
output_path = args.output

if not args.publisher:
print("need to specify --publisher")
Expand All @@ -735,7 +747,7 @@ def export(args):
print("need to specify --benchmark to extract collection time")
exit()

pipeline_name = args.pipeline_name
pipeline_name = args.pipeline_name
machine_id = args.id
pipeline_path = get_pipeline_path(data_path, pipeline_name=pipeline_name)
machine_path = get_machine_path(output_path, args.version, machine_id)
Expand All @@ -752,7 +764,7 @@ def export(args):
args.target_data = "preprocess"
args.output = get_preprocess_folder(machine_path)
plot(args)

# plot error
args.target_data = "error"
args.output = os.path.join(machine_path, "error_summary")
Expand All @@ -767,7 +779,7 @@ def export(args):
for feature_group in SingleSourceFeatures:
args.feature_group = feature_group
plot(args)

def plot_scenario(args):
if not args.benchmark:
print("Need --benchmark")
Expand Down Expand Up @@ -811,7 +823,7 @@ def plot_scenario(args):

valid_fg = [fg_key for fg_key in FeatureGroups.keys()]
ot, fg = check_ot_fg(args, valid_fg)
if fg is not None:
if fg is not None:
valid_fg = [fg]
energy_sources = args.energy_source.split(",")
output_folder = os.path.join(data_path, args.output)
Expand All @@ -834,7 +846,7 @@ def plot_scenario(args):
extractor = DefaultExtractor()
data, power_cols, _, _ = extractor.extract(query_results, energy_components, fg.name, args.energy_source, node_level=True)
feature_plot += [fg.name]
feature_cols = FeatureGroups[fg]
feature_cols = FeatureGroups[fg]
power_cols = [col for col in data.columns if "power" in col]
feature_data = data.groupby([TIMESTAMP_COL]).sum()
_ts_plot(feature_data, feature_cols, "Feature group: {} ({})".format(fg.name, args.scenario), output_folder, data_filename)
Expand All @@ -860,6 +872,7 @@ def plot_scenario(args):
parser.add_argument("--interval", type=int, help="Specify query interval.", default=PROM_QUERY_INTERVAL)
parser.add_argument("--step", type=str, help="Specify query step.", default=PROM_QUERY_STEP)
parser.add_argument("--metric-prefix", type=str, help="Specify metrix prefix to filter.", default=KEPLER_METRIC_PREFIX)
parser.add_argument("-tm", "--thirdparty-metrics", nargs='+', help="Specify the thirdparty metrics that not included by Kepler", default="")

# Train arguments
parser.add_argument("-p", "--pipeline-name", type=str, help="Specify pipeline name.")
Expand All @@ -883,7 +896,7 @@ def plot_scenario(args):
# Plot arguments
parser.add_argument("--target-data", type=str, help="Speficy target plot data (preprocess, estimate)")
parser.add_argument("--scenario", type=str, help="Speficy scenario")

# Export arguments
parser.add_argument("--id", type=str, help="specify machine id")
parser.add_argument("--version", type=str, help="Specify model server version.", default=default_version)
Expand All @@ -903,4 +916,4 @@ def plot_scenario(args):
else:
getattr(sys.modules[__name__], args.command)(args)


2 changes: 1 addition & 1 deletion src/train/extractor/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,4 +290,4 @@ def process_feature(self, features, feature_to_remove, feature_to_add):
for feature in features:
if feature not in feature_to_remove:
new_features.append(feature)
return new_features + feature_to_add
return new_features + feature_to_add
13 changes: 11 additions & 2 deletions src/util/prom_types.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

from config import getConfig
import pandas as pd
from train_types import SYSTEM_FEATURES, FeatureGroups, FeatureGroup, get_valid_feature_groups
from train_types import SYSTEM_FEATURES, WORKLOAD_FEATURES, FeatureGroups, FeatureGroup, deep_sort, get_valid_feature_groups
PROM_SERVER = 'http://localhost:9090'
PROM_SSL_DISABLE = 'True'
PROM_HEADERS = ''
Expand All @@ -15,6 +15,8 @@
PROM_QUERY_INTERVAL = getConfig('PROM_QUERY_INTERVAL', PROM_QUERY_INTERVAL)
PROM_QUERY_STEP = getConfig('PROM_QUERY_STEP', PROM_QUERY_STEP)

PROM_THIRDPARTY_METRICS = getConfig('PROM_THIRDPARTY_METRICS', "").split(',')

metric_prefix = "kepler_"
TIMESTAMP_COL = "timestamp"
PACKAGE_COL = "package"
Expand Down Expand Up @@ -47,11 +49,18 @@ def feature_to_query(feature):
return "{}_{}".format(node_query_prefix, feature)
if feature in FeatureGroups[FeatureGroup.AcceleratorOnly]:
return "{}_{}".format(node_query_prefix, feature)
if FeatureGroup.ThirdParty in FeatureGroups is not None and feature in FeatureGroups[FeatureGroup.ThirdParty]:
return feature
return "{}_{}_{}".format(container_query_prefix, feature, container_query_suffix)

def energy_component_to_query(component):
return "{}_{}_{}".format(node_query_prefix, component, node_query_suffix)

def update_thirdparty_metrics(metrics):
global FeatureGroups
FeatureGroups[FeatureGroup.ThirdParty] = metrics
FeatureGroups[FeatureGroup.WorkloadOnly] = deep_sort(WORKLOAD_FEATURES + metrics)

def get_valid_feature_group_from_queries(queries):
all_workload_features = FeatureGroups[FeatureGroup.WorkloadOnly]
features = [feature for feature in all_workload_features if feature_to_query(feature) in queries]
Expand Down Expand Up @@ -94,4 +103,4 @@ def prom_responses_to_results(prom_responses):
results = dict()
for query_metric, prom_response in prom_responses.items():
results[query_metric] = generate_dataframe_from_response(query_metric, prom_response)
return results
return results
3 changes: 2 additions & 1 deletion src/util/train_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class FeatureGroup(enum.Enum):
Basic = 9
BPFIRQ = 10
AcceleratorOnly = 11
ThirdParty = 12
Unknown = 99

class EnergyComponentLabelGroup(enum.Enum):
Expand Down Expand Up @@ -198,4 +199,4 @@ def is_weight_output(output_type):
random.shuffle(shuffled_features)
get_group = get_feature_group(shuffled_features)
assert get_group == g, "must be " + str(g)
assert get_feature_group([]) == FeatureGroup.Unknown, "must be unknown"
assert get_feature_group([]) == FeatureGroup.Unknown, "must be unknown"

0 comments on commit d74879b

Please sign in to comment.