diff --git a/cmd/main.py b/cmd/main.py index 508b21d5..d70ca2e8 100644 --- a/cmd/main.py +++ b/cmd/main.py @@ -18,8 +18,8 @@ src_path = os.path.join(os.path.dirname(__file__), '..', 'src') sys.path.append(src_path) -from util.prom_types import PROM_SERVER, PROM_QUERY_INTERVAL, PROM_QUERY_STEP, PROM_HEADERS, PROM_SSL_DISABLE -from util.prom_types import metric_prefix as KEPLER_METRIC_PREFIX, node_info_column, prom_responses_to_results, TIMESTAMP_COL, feature_to_query +from util.prom_types import PROM_SERVER, PROM_QUERY_INTERVAL, PROM_QUERY_STEP, PROM_HEADERS, PROM_SSL_DISABLE, PROM_THIRDPARTY_METRICS +from util.prom_types import metric_prefix as KEPLER_METRIC_PREFIX, node_info_column, prom_responses_to_results, TIMESTAMP_COL, feature_to_query, update_thirdparty_metrics from util.train_types import ModelOutputType, FeatureGroup, FeatureGroups, is_single_source_feature_group, SingleSourceFeatures from util.loader import load_json, DEFAULT_PIPELINE, load_pipeline_metadata, get_pipeline_path, get_model_group_path, list_pipelines, list_model_names, load_metadata, load_csv, get_machine_path, get_preprocess_folder, get_general_filename from util.saver import save_json, save_csv, save_train_args @@ -56,7 +56,7 @@ def summary_validation(validate_df): items = [] metric_to_validate_pod = { "cgroup": "kepler_container_cgroupfs_cpu_usage_us_total", - # "hwc": "kepler_container_cpu_instructions_total", + # "hwc": "kepler_container_cpu_instructions_total", "hwc": "kepler_container_cpu_cycles_total", "kubelet": "kepler_container_kubelet_cpu_usage_total", "bpf": "kepler_container_bpf_cpu_time_us_total", @@ -214,7 +214,15 @@ def query(args): print("Query from {}.".format(benchmark_filename)) start, end = extract_time(benchmark_filename) available_metrics = prom.all_metrics() - queries = [m for m in available_metrics if args.metric_prefix in m] + + queries = None + if args.thirdparty_metrics != "": + queries = [m for m in available_metrics if args.metric_prefix in m or m in args.thirdparty_metrics] + elif PROM_THIRDPARTY_METRICS != [""]: + queries = [m for m in available_metrics if args.metric_prefix in m or m in PROM_THIRDPARTY_METRICS] + else: + queries = [m for m in available_metrics if args.metric_prefix in m] + print("Start {} End {}".format(start, end)) response = _range_queries(prom, queries, start, end, args.step, None) save_json(path=data_path, name=args.output, data=response) @@ -235,7 +243,7 @@ def validate(args): summary_validation(validate_df) if args.output: save_csv(path=data_path, name=args.output, data=validate_df) - + def assert_train(trainer, data, energy_components): import pandas as pd node_types = pd.unique(data[node_info_column]) @@ -260,7 +268,7 @@ def get_isolator(isolator, profile, pipeline_name, target_hints, bg_hints): target_hints = target_hints.split(",") else: target_hints = [] - + if bg_hints: bg_hints = bg_hints.split(",") else: @@ -280,12 +288,12 @@ def get_isolator(isolator, profile, pipeline_name, target_hints, bg_hints): supported_isolator[profile_isolator.get_name()] = profile_isolator else: trainer_isolator = TrainIsolator(target_hints=target_hints, bg_hints=bg_hints, abs_pipeline_name=pipeline_name) - + supported_isolator[trainer_isolator.get_name()] = trainer_isolator if isolator not in supported_isolator: print("isolator {} is not supported. supported isolator: {}".format(isolator, supported_isolator.keys())) - return None + return None return supported_isolator[isolator] def get_extractor(extractor): @@ -312,7 +320,11 @@ def extract(args): input = args.input response = load_json(data_path, input) query_results = prom_responses_to_results(response) - + # Inject thirdparty_metrics to FeatureGroup + if args.thirdparty_metrics != "": + update_thirdparty_metrics(args.thirdparty_metrics) + elif PROM_THIRDPARTY_METRICS != [""]: + update_thirdparty_metrics(PROM_THIRDPARTY_METRICS) valid_fg = get_valid_feature_group_from_queries([query for query in query_results.keys() if len(query_results[query]) > 1 ]) ot, fg = check_ot_fg(args, valid_fg) if fg is None or ot is None: @@ -352,7 +364,7 @@ def train(args): pipeline_name = DEFAULT_PIPELINE if args.pipeline_name: - pipeline_name = args.pipeline_name + pipeline_name = args.pipeline_name inputs = args.input.split(",") energy_sources = args.energy_source.split(",") @@ -370,7 +382,7 @@ def train(args): valid_feature_groups = list(set(valid_feature_groups).intersection(set(valid_fg))) input_query_results_list += [query_results] - + abs_trainer_names = args.abs_trainers.split(",") dyn_trainer_names = args.dyn_trainers.split(",") pipeline = get_pipeline(pipeline_name, args.extractor, args.profile, args.target_hints, args.bg_hints, args.isolator, abs_trainer_names, dyn_trainer_names, energy_sources, valid_feature_groups) @@ -381,7 +393,7 @@ def train(args): energy_components = PowerSourceMap[energy_source] for feature_group in valid_feature_groups: success, abs_data, dyn_data = pipeline.process_multiple_query(input_query_results_list, energy_components, energy_source, feature_group=feature_group.name) - assert success, "failed to process pipeline {}".format(pipeline.name) + assert success, "failed to process pipeline {}".format(pipeline.name) for trainer in pipeline.trainers: if trainer.feature_group == feature_group and trainer.energy_source == energy_source: if trainer.node_level and abs_data is not None: @@ -397,7 +409,7 @@ def train(args): print("=========== Train {} Summary ============".format(energy_source)) - # save args + # save args argparse_dict = vars(args) save_train_args(pipeline.path, argparse_dict) print("Train args:", argparse_dict) @@ -431,10 +443,10 @@ def estimate(args): response = load_json(data_path, input) query_results = prom_responses_to_results(response) input_query_results_list += [query_results] - + valid_fg = get_valid_feature_group_from_queries([query for query in query_results.keys() if len(query_results[query]) > 1 ]) ot, fg = check_ot_fg(args, valid_fg) - if fg is not None: + if fg is not None: valid_fg = [fg] best_result_map = dict() @@ -526,7 +538,7 @@ def estimate(args): path_splits = best_model_path.split("/") best_model_id_map[energy_source] = "{} using {}".format(path_splits[-1], path_splits[-2]) return best_result_map, power_labels_map, best_model_id_map, pd.DataFrame(summary_items) - + def _ts_plot(data, cols, title, output_folder, name, labels=None, subtitles=None, ylabel=None): plot_height = 3 plot_width = 10 @@ -558,7 +570,7 @@ def _ts_plot(data, cols, title, output_folder, name, labels=None, subtitles=None def _feature_power_plot(data, model_id, output_type, energy_source, feature_cols, actual_power_cols, predicted_power_cols, output_folder, name): plot_height = 5 plot_width = 5 - + import matplotlib.pyplot as plt import seaborn as sns sns.set(font_scale=1.2) @@ -597,10 +609,10 @@ def _summary_plot(energy_source, summary_df, output_folder, name): if len(summary_df) == 0: print("no summary data to plot") return - + plot_height = 3 plot_width = 20 - + import matplotlib.pyplot as plt import seaborn as sns sns.set(font_scale=1.2) @@ -637,7 +649,7 @@ def plot(args): exit() valid_fg = [fg_key for fg_key in FeatureGroups.keys()] ot, fg = check_ot_fg(args, valid_fg) - if fg is not None: + if fg is not None: valid_fg = [fg] print("Plot:", args) energy_sources = args.energy_source.split(",") @@ -662,7 +674,7 @@ def plot(args): print("cannot load data from {}/{}".format(data_saved_path, data_filename)) continue feature_plot += [fg.name] - feature_cols = FeatureGroups[fg] + feature_cols = FeatureGroups[fg] power_cols = [col for col in data.columns if "power" in col] feature_data = data.groupby([TIMESTAMP_COL]).sum() _ts_plot(feature_data[feature_data[feature_cols]>0], feature_cols, "Feature group: {}".format(fg.name), output_folder, data_filename) @@ -725,7 +737,7 @@ def export(args): if args.output == default_output_filename: print("need to specify --output for /models path") exit() - output_path = args.output + output_path = args.output if not args.publisher: print("need to specify --publisher") @@ -735,7 +747,7 @@ def export(args): print("need to specify --benchmark to extract collection time") exit() - pipeline_name = args.pipeline_name + pipeline_name = args.pipeline_name machine_id = args.id pipeline_path = get_pipeline_path(data_path, pipeline_name=pipeline_name) machine_path = get_machine_path(output_path, args.version, machine_id) @@ -752,7 +764,7 @@ def export(args): args.target_data = "preprocess" args.output = get_preprocess_folder(machine_path) plot(args) - + # plot error args.target_data = "error" args.output = os.path.join(machine_path, "error_summary") @@ -767,7 +779,7 @@ def export(args): for feature_group in SingleSourceFeatures: args.feature_group = feature_group plot(args) - + def plot_scenario(args): if not args.benchmark: print("Need --benchmark") @@ -811,7 +823,7 @@ def plot_scenario(args): valid_fg = [fg_key for fg_key in FeatureGroups.keys()] ot, fg = check_ot_fg(args, valid_fg) - if fg is not None: + if fg is not None: valid_fg = [fg] energy_sources = args.energy_source.split(",") output_folder = os.path.join(data_path, args.output) @@ -834,7 +846,7 @@ def plot_scenario(args): extractor = DefaultExtractor() data, power_cols, _, _ = extractor.extract(query_results, energy_components, fg.name, args.energy_source, node_level=True) feature_plot += [fg.name] - feature_cols = FeatureGroups[fg] + feature_cols = FeatureGroups[fg] power_cols = [col for col in data.columns if "power" in col] feature_data = data.groupby([TIMESTAMP_COL]).sum() _ts_plot(feature_data, feature_cols, "Feature group: {} ({})".format(fg.name, args.scenario), output_folder, data_filename) @@ -860,6 +872,7 @@ def plot_scenario(args): parser.add_argument("--interval", type=int, help="Specify query interval.", default=PROM_QUERY_INTERVAL) parser.add_argument("--step", type=str, help="Specify query step.", default=PROM_QUERY_STEP) parser.add_argument("--metric-prefix", type=str, help="Specify metrix prefix to filter.", default=KEPLER_METRIC_PREFIX) + parser.add_argument("-tm", "--thirdparty-metrics", nargs='+', help="Specify the thirdparty metrics that not included by Kepler", default="") # Train arguments parser.add_argument("-p", "--pipeline-name", type=str, help="Specify pipeline name.") @@ -883,7 +896,7 @@ def plot_scenario(args): # Plot arguments parser.add_argument("--target-data", type=str, help="Speficy target plot data (preprocess, estimate)") parser.add_argument("--scenario", type=str, help="Speficy scenario") - + # Export arguments parser.add_argument("--id", type=str, help="specify machine id") parser.add_argument("--version", type=str, help="Specify model server version.", default=default_version) @@ -903,4 +916,4 @@ def plot_scenario(args): else: getattr(sys.modules[__name__], args.command)(args) - \ No newline at end of file + diff --git a/src/train/extractor/extractor.py b/src/train/extractor/extractor.py index 173ae194..67eeee00 100644 --- a/src/train/extractor/extractor.py +++ b/src/train/extractor/extractor.py @@ -290,4 +290,4 @@ def process_feature(self, features, feature_to_remove, feature_to_add): for feature in features: if feature not in feature_to_remove: new_features.append(feature) - return new_features + feature_to_add \ No newline at end of file + return new_features + feature_to_add diff --git a/src/util/prom_types.py b/src/util/prom_types.py index f3254eaf..c51058a7 100644 --- a/src/util/prom_types.py +++ b/src/util/prom_types.py @@ -1,7 +1,7 @@ from config import getConfig import pandas as pd -from train_types import SYSTEM_FEATURES, FeatureGroups, FeatureGroup, get_valid_feature_groups +from train_types import SYSTEM_FEATURES, WORKLOAD_FEATURES, FeatureGroups, FeatureGroup, deep_sort, get_valid_feature_groups PROM_SERVER = 'http://localhost:9090' PROM_SSL_DISABLE = 'True' PROM_HEADERS = '' @@ -15,6 +15,8 @@ PROM_QUERY_INTERVAL = getConfig('PROM_QUERY_INTERVAL', PROM_QUERY_INTERVAL) PROM_QUERY_STEP = getConfig('PROM_QUERY_STEP', PROM_QUERY_STEP) +PROM_THIRDPARTY_METRICS = getConfig('PROM_THIRDPARTY_METRICS', "").split(',') + metric_prefix = "kepler_" TIMESTAMP_COL = "timestamp" PACKAGE_COL = "package" @@ -47,11 +49,18 @@ def feature_to_query(feature): return "{}_{}".format(node_query_prefix, feature) if feature in FeatureGroups[FeatureGroup.AcceleratorOnly]: return "{}_{}".format(node_query_prefix, feature) + if FeatureGroup.ThirdParty in FeatureGroups is not None and feature in FeatureGroups[FeatureGroup.ThirdParty]: + return feature return "{}_{}_{}".format(container_query_prefix, feature, container_query_suffix) def energy_component_to_query(component): return "{}_{}_{}".format(node_query_prefix, component, node_query_suffix) +def update_thirdparty_metrics(metrics): + global FeatureGroups + FeatureGroups[FeatureGroup.ThirdParty] = metrics + FeatureGroups[FeatureGroup.WorkloadOnly] = deep_sort(WORKLOAD_FEATURES + metrics) + def get_valid_feature_group_from_queries(queries): all_workload_features = FeatureGroups[FeatureGroup.WorkloadOnly] features = [feature for feature in all_workload_features if feature_to_query(feature) in queries] @@ -94,4 +103,4 @@ def prom_responses_to_results(prom_responses): results = dict() for query_metric, prom_response in prom_responses.items(): results[query_metric] = generate_dataframe_from_response(query_metric, prom_response) - return results \ No newline at end of file + return results diff --git a/src/util/train_types.py b/src/util/train_types.py index a4fbb90b..b0fc8b18 100644 --- a/src/util/train_types.py +++ b/src/util/train_types.py @@ -50,6 +50,7 @@ class FeatureGroup(enum.Enum): Basic = 9 BPFIRQ = 10 AcceleratorOnly = 11 + ThirdParty = 12 Unknown = 99 class EnergyComponentLabelGroup(enum.Enum): @@ -198,4 +199,4 @@ def is_weight_output(output_type): random.shuffle(shuffled_features) get_group = get_feature_group(shuffled_features) assert get_group == g, "must be " + str(g) - assert get_feature_group([]) == FeatureGroup.Unknown, "must be unknown" \ No newline at end of file + assert get_feature_group([]) == FeatureGroup.Unknown, "must be unknown"