diff --git a/cmd/README.md b/cmd/README.md index 9a10485f..d70f8464 100644 --- a/cmd/README.md +++ b/cmd/README.md @@ -2,63 +2,6 @@ Use kepler model server function as a standalone docker container. -``` -usage: main.py [-h] [-i INPUT] [-o OUTPUT] [-s SERVER] [--interval INTERVAL] [--step STEP] [--metric-prefix METRIC_PREFIX] [-p PIPELINE_NAME] [--extractor EXTRACTOR] [--isolator ISOLATOR] [--profile PROFILE] [--target-hints TARGET_HINTS] [--bg-hints BG_HINTS] - [-e ENERGY_SOURCE] [--abs-trainers ABS_TRAINERS] [--dyn-trainers DYN_TRAINERS] [--benchmark BENCHMARK] [-ot OUTPUT_TYPE] [-fg FEATURE_GROUP] [--model-name MODEL_NAME] [--target-data TARGET_DATA] [--scenario SCENARIO] [--id ID] [--version VERSION] - [--publisher PUBLISHER] [--include-raw INCLUDE_RAW] - command - -Kepler model server entrypoint - -positional arguments: - command The command to execute. - -optional arguments: - -h, --help show this help message and exit - -i INPUT, --input INPUT - Specify input file/folder name. - -o OUTPUT, --output OUTPUT - Specify output file/folder name - -s SERVER, --server SERVER - Specify prometheus server. - --interval INTERVAL Specify query interval. - --step STEP Specify query step. - --metric-prefix METRIC_PREFIX - Specify metrix prefix to filter. - -p PIPELINE_NAME, --pipeline-name PIPELINE_NAME - Specify pipeline name. - --extractor EXTRACTOR - Specify extractor name (default, smooth). - --isolator ISOLATOR Specify isolator name (none, min, profile, trainer). - --profile PROFILE Specify profile input (required for trainer and profile isolator). - --target-hints TARGET_HINTS - Specify dynamic workload container name hints (used by TrainIsolator) - --bg-hints BG_HINTS Specify background workload container name hints (used by TrainIsolator) - -e ENERGY_SOURCE, --energy-source ENERGY_SOURCE - Specify energy source. - --abs-trainers ABS_TRAINERS - Specify trainer names (use comma(,) as delimiter). - --dyn-trainers DYN_TRAINERS - Specify trainer names (use comma(,) as delimiter). - --benchmark BENCHMARK - Specify benchmark file name. - -ot OUTPUT_TYPE, --output-type OUTPUT_TYPE - Specify output type (AbsPower or DynPower) for energy estimation. - -fg FEATURE_GROUP, --feature-group FEATURE_GROUP - Specify target feature group for energy estimation. - --model-name MODEL_NAME - Specify target model name for energy estimation. - --target-data TARGET_DATA - Speficy target plot data (preprocess, estimate) - --scenario SCENARIO Speficy scenario - --id ID specify machine id - --version VERSION Specify model server version. - --publisher PUBLISHER - Specify github account of model publisher - --include-raw INCLUDE_RAW - Include raw query data -``` - ## Get started 1. Deploy Kepler with Prometheus in the cluster exporting prometheus to port `:9090` diff --git a/cmd/cmd_plot.py b/cmd/cmd_plot.py new file mode 100644 index 00000000..84bfc83e --- /dev/null +++ b/cmd/cmd_plot.py @@ -0,0 +1,113 @@ +import os +import sys + +cur_path = os.path.join(os.path.dirname(__file__), '.') +sys.path.append(cur_path) +src_path = os.path.join(os.path.dirname(__file__), '..', 'src') +sys.path.append(src_path) + +from util.prom_types import TIMESTAMP_COL +from util import PowerSourceMap + + +def ts_plot(data, cols, title, output_folder, name, labels=None, subtitles=None, ylabel=None): + plot_height = 3 + plot_width = 10 + import matplotlib.pyplot as plt + import seaborn as sns + sns.set(font_scale=1.2) + fig, axes = plt.subplots(len(cols), 1, figsize=(plot_width, len(cols)*plot_height)) + for i in range(0, len(cols)): + if len(cols) == 1: + ax = axes + else: + ax = axes[i] + if isinstance(cols[i], list): + # multiple lines + for j in range(0, len(cols[i])): + sns.lineplot(data=data, x=TIMESTAMP_COL, y=cols[i][j], ax=ax, label=labels[j]) + ax.set_title(subtitles[i]) + else: + sns.lineplot(data=data, x=TIMESTAMP_COL, y=cols[i], ax=ax) + ax.set_title(cols[i]) + if ylabel is not None: + ax.set_ylabel(ylabel) + plt.suptitle(title, x=0.5, y=0.99) + plt.tight_layout() + filename = os.path.join(output_folder, name + ".png") + fig.savefig(filename) + plt.close() + +def feature_power_plot(data, model_id, output_type, energy_source, feature_cols, actual_power_cols, predicted_power_cols, output_folder, name): + plot_height = 5 + plot_width = 5 + + import matplotlib.pyplot as plt + import seaborn as sns + sns.set(font_scale=1.2) + row_num = len(feature_cols) + col_num = len(actual_power_cols) + width = max(10, col_num*plot_width) + fig, axes = plt.subplots(row_num, col_num, figsize=(width, row_num*plot_height)) + for xi in range(0, row_num): + feature_col = feature_cols[xi] + for yi in range(0, col_num): + if row_num == 1: + if col_num == 1: + ax = axes + else: + ax = axes[yi] + else: + if col_num == 1: + ax = axes[xi] + else: + ax = axes[xi][yi] + sorted_data = data.sort_values(by=[feature_col]) + sns.scatterplot(data=sorted_data, x=feature_col, y=actual_power_cols[yi], ax=ax, label="actual") + sns.lineplot(data=sorted_data, x=feature_col, y=predicted_power_cols[yi], ax=ax, label="predicted", color='C1') + if xi == 0: + ax.set_title(actual_power_cols[yi]) + if yi == 0: + ax.set_ylabel("Power (W)") + title = "{} {} prediction correlation \n by {}".format(energy_source, output_type, model_id) + plt.suptitle(title, x=0.5, y=0.99) + plt.tight_layout() + filename = os.path.join(output_folder, name + ".png") + fig.savefig(filename) + plt.close() + +def summary_plot(args, energy_source, summary_df, output_folder, name): + if len(summary_df) == 0: + print("no summary data to plot") + return + + plot_height = 3 + plot_width = 20 + + import matplotlib.pyplot as plt + import seaborn as sns + sns.set(font_scale=1.2) + + energy_components = PowerSourceMap[energy_source] + col_num = len(energy_components) + fig, axes = plt.subplots(col_num, 1, figsize=(plot_width, plot_height*col_num)) + for i in range(0, col_num): + component = energy_components[i] + data = summary_df[(summary_df["energy_source"]==energy_source) & (summary_df["energy_component"]==component)] + data = data.sort_values(by=["Feature Group", "MAE"]) + if col_num == 1: + ax = axes + else: + ax = axes[i] + sns.barplot(data=data, x="Feature Group", y="MAE", hue="Model", ax=ax) + ax.set_title(component) + ax.set_ylabel("MAE (Watt)") + ax.set_ylim((0, 100)) + if i < col_num-1: + ax.set_xlabel("") + ax.legend(bbox_to_anchor=(1.05, 1.05)) + plt.suptitle("{} {} error".format(energy_source, args.output_type)) + plt.tight_layout() + filename = os.path.join(output_folder, name + ".png") + fig.savefig(filename) + plt.close() diff --git a/cmd/cmd_util.py b/cmd/cmd_util.py new file mode 100644 index 00000000..a64e202f --- /dev/null +++ b/cmd/cmd_util.py @@ -0,0 +1,300 @@ +import os +import sys +import datetime +import pandas as pd + +UTC_OFFSET_TIMEDELTA = datetime.datetime.utcnow() - datetime.datetime.now() + +cur_path = os.path.join(os.path.dirname(__file__), '.') +sys.path.append(cur_path) +src_path = os.path.join(os.path.dirname(__file__), '..', 'src') +sys.path.append(src_path) + +from util.prom_types import node_info_column, prom_responses_to_results +from util.train_types import ModelOutputType, FeatureGroup +from util.loader import load_json, get_pipeline_path + +def print_file_to_stdout(data_path, args): + file_path = os.path.join(data_path, args.output) + try: + with open(file_path, 'r') as file: + contents = file.read() + print(contents) + except FileNotFoundError: + print(f"Error: Output '{file_path}' not found.") + except IOError: + print(f"Error: Unable to read output '{file_path}'.") + +def extract_time(data_path, benchmark_filename): + data = load_json(data_path, benchmark_filename) + if benchmark_filename != "customBenchmark": + start_str = data["metadata"]["creationTimestamp"] + start = datetime.datetime.strptime(start_str, '%Y-%m-%dT%H:%M:%SZ') + end_str = data["status"]["results"][-1]["repetitions"][-1]["pushedTime"].split(".")[0] + end = datetime.datetime.strptime(end_str, '%Y-%m-%d %H:%M:%S') + else: + start_str = data["startTimeUTC"] + start = datetime.datetime.strptime(start_str, '%Y-%m-%dT%H:%M:%SZ') + end_str = data["endTimeUTC"] + end = datetime.datetime.strptime(end_str, '%Y-%m-%dT%H:%M:%SZ') + print(UTC_OFFSET_TIMEDELTA) + return start-UTC_OFFSET_TIMEDELTA, end-UTC_OFFSET_TIMEDELTA + +def summary_validation(validate_df): + if len(validate_df) == 0: + print("No data for validation.") + return + items = [] + metric_to_validate_pod = { + "cgroup": "kepler_container_cgroupfs_cpu_usage_us_total", + # "hwc": "kepler_container_cpu_instructions_total", + "hwc": "kepler_container_cpu_cycles_total", + "kubelet": "kepler_container_kubelet_cpu_usage_total", + "bpf": "kepler_container_bpf_cpu_time_us_total", + } + metric_to_validate_power = { + "rapl": "kepler_node_package_joules_total", + "platform": "kepler_node_platform_joules_total" + } + for metric, query in metric_to_validate_pod.items(): + target_df = validate_df[validate_df["query"]==query] + valid_df = target_df[target_df[">0"] > 0] + if len(valid_df) == 0: + # no data + continue + availability = len(valid_df)/len(target_df) + valid_datapoint = valid_df[">0"].sum() + item = dict() + item["usage_metric"] = metric + item["availability"] = availability + item["valid_datapoint"] = valid_datapoint + items += [item] + summary_df = pd.DataFrame(items) + print(summary_df) + for metric, query in metric_to_validate_pod.items(): + target_df = validate_df[validate_df["query"]==query] + no_data_df = target_df[target_df["count"] == 0] + zero_data_df = target_df[target_df[">0"] == 0] + valid_df = target_df[target_df[">0"] > 0] + print("==== {} ====".format(metric)) + if len(no_data_df) > 0: + print("{} pods: \tNo data for {}".format(len(no_data_df), pd.unique(no_data_df["scenarioID"]))) + if len(zero_data_df) > 0: + print("{} pods: \tZero data for {}".format(len(zero_data_df), pd.unique(zero_data_df["scenarioID"]))) + + print("{} pods: \tValid\n".format(len(valid_df))) + print("Valid data points:") + print( "Empty" if len(valid_df[">0"]) == 0 else valid_df.groupby(["scenarioID"]).sum()[[">0"]]) + for metric, query in metric_to_validate_power.items(): + target_df = validate_df[validate_df["query"]==query] + print("{} data: \t{}".format(metric, target_df[">0"].values)) + +def get_validate_df(data_path, benchmark_filename, query_response): + items = [] + query_results = prom_responses_to_results(query_response) + container_queries = [query for query in query_results.keys() if "container" in query] + status_data = load_json(data_path, benchmark_filename) + if status_data is None or status_data.get("status", None) == None: + # select all with keyword + for query in container_queries: + df = query_results[query] + if len(df) == 0: + # set validate item // no value + item = dict() + item["pod"] = benchmark_filename + item["scenarioID"] = "" + item["query"] = query + item["count"] = 0 + item[">0"] = 0 + item["total"] = 0 + items += [item] + continue + filtered_df = df[df["pod_name"].str.contains(benchmark_filename)] + # set validate item + item = dict() + item["pod"] = benchmark_filename + item["scenarioID"] = "" + item["query"] = query + item["count"] = len(filtered_df) + item[">0"] = len(filtered_df[filtered_df[query] > 0]) + item["total"] = filtered_df[query].max() + items += [item] + else: + cpe_results = status_data["status"]["results"] + for result in cpe_results: + scenarioID = result["scenarioID"] + scenarios = result["scenarios"] + configurations = result["configurations"] + for k, v in scenarios.items(): + result[k] = v + for k, v in configurations.items(): + result[k] = v + repetitions = result["repetitions"] + for rep in repetitions: + podname = rep["pod"] + for query in container_queries: + df = query_results[query] + if len(df) == 0: + # set validate item // no value + item = dict() + item["pod"] = podname + item["scenarioID"] = scenarioID + item["query"] = query + item["count"] = 0 + item[">0"] = 0 + item["total"] = 0 + items += [item] + continue + filtered_df = df[df["pod_name"]==podname] + # set validate item + item = dict() + item["pod"] = podname + item["scenarioID"] = scenarioID + item["query"] = query + item["count"] = len(filtered_df) + item[">0"] = len(filtered_df[filtered_df[query] > 0]) + item["total"] = filtered_df[query].max() + items += [item] + energy_queries = [query for query in query_results.keys() if "_joules" in query] + for query in energy_queries: + df = query_results[query] + if len(df) == 0: + # set validate item // no value + item = dict() + item["pod"] = "" + item["scenarioID"] = "" + item["query"] = query + item["count"] = 0 + item[">0"] = 0 + item["total"] = 0 + items += [item] + continue + # set validate item + item = dict() + item["pod"] = "" + item["scenarioID"] = "" + item["query"] = query + item["count"] = len(df) + item[">0"] = len(df[df[query] > 0]) + item["total"] = df[query].max() + items += [item] + other_queries = [query for query in query_results.keys() if query not in container_queries and query not in energy_queries] + for query in other_queries: + df = query_results[query] + if len(df) == 0: + # set validate item // no value + item = dict() + item["pod"] = benchmark_filename + item["scenarioID"] = "" + item["query"] = query + item["count"] = 0 + item[">0"] = 0 + item["total"] = 0 + items += [item] + continue + # set validate item + item = dict() + item["pod"] = benchmark_filename + item["scenarioID"] = "" + item["query"] = query + item["count"] = len(df) + item[">0"] = len(df[df[query] > 0]) + item["total"] = df[query].max() + items += [item] + validate_df = pd.DataFrame(items) + if not validate_df.empty: + print(validate_df.groupby(["scenarioID", "query"]).sum()[["count", ">0"]]) + return validate_df + +def check_ot_fg(args, valid_fg): + ot = None + fg = None + if args.output_type: + try: + ot = ModelOutputType[args.output_type] + except KeyError: + print("invalid output type. please use AbsPower or DynPower", args.output_type) + exit() + if args.feature_group: + valid_fg_name_list = [fg.name for fg in valid_fg] + try: + fg = FeatureGroup[args.feature_group] + if args.feature_group not in valid_fg_name_list: + print("feature group: {} is not available in your data. please choose from the following list: {}".format(args.feature_group, valid_fg_name_list)) + exit() + except KeyError: + print("invalid feature group: {}. valid feature group are {}.".format((args.feature_group, [fg.name for fg in valid_fg]))) + exit() + return ot, fg + +def assert_train(trainer, data, energy_components): + import pandas as pd + node_types = pd.unique(data[node_info_column]) + for node_type in node_types: + node_type_str = int(node_type) + node_type_filtered_data = data[data[node_info_column] == node_type] + X_values = node_type_filtered_data[trainer.features].values + for component in energy_components: + output = trainer.predict(node_type_str, component, X_values) + if output is not None: + assert len(output) == len(X_values), "length of predicted values != features ({}!={})".format(len(output), len(X_values)) + +def get_isolator(data_path, isolator, profile, pipeline_name, target_hints, bg_hints, abs_pipeline_name): + pipeline_path = get_pipeline_path(data_path, pipeline_name=pipeline_name) + from train import MinIdleIsolator, NoneIsolator, DefaultProfiler, ProfileBackgroundIsolator, TrainIsolator, generate_profiles + supported_isolator = { + MinIdleIsolator().get_name(): MinIdleIsolator(), + NoneIsolator().get_name(): NoneIsolator(), + } + + if target_hints: + target_hints = target_hints.split(",") + else: + target_hints = [] + + if bg_hints: + bg_hints = bg_hints.split(",") + else: + bg_hints = [] + + profiles = dict() + if profile: + idle_response = load_json(data_path, profile) + idle_data = prom_responses_to_results(idle_response) + if idle_data is None: + print("failed to read idle data") + return None + profile_map = DefaultProfiler.process(idle_data, profile_top_path=pipeline_path) + profiles = generate_profiles(profile_map) + profile_isolator = ProfileBackgroundIsolator(profiles, idle_data) + supported_isolator[profile_isolator.get_name()] = profile_isolator + if abs_pipeline_name != "": + trainer_isolator = TrainIsolator(idle_data=idle_data, profiler=DefaultProfiler, target_hints=target_hints, bg_hints=bg_hints, abs_pipeline_name=abs_pipeline_name) + supported_isolator[trainer_isolator.get_name()] = trainer_isolator + else: + if abs_pipeline_name != "": + trainer_isolator = TrainIsolator(target_hints=target_hints, bg_hints=bg_hints, abs_pipeline_name=abs_pipeline_name) + supported_isolator[trainer_isolator.get_name()] = trainer_isolator + + if isolator not in supported_isolator: + print("isolator {} is not supported. supported isolator: {}".format(isolator, supported_isolator.keys())) + return None + return supported_isolator[isolator] + +def get_extractor(extractor): + from train import DefaultExtractor, SmoothExtractor + supported_extractor = { + DefaultExtractor().get_name(): DefaultExtractor(), + SmoothExtractor().get_name(): SmoothExtractor() + } + if extractor not in supported_extractor: + print("extractor {} is not supported. supported extractor: {}".format(extractor, supported_extractor.keys())) + return None + return supported_extractor[extractor] + +def get_pipeline(data_path, pipeline_name, extractor, profile, target_hints, bg_hints, abs_pipeline_name, isolator, abs_trainer_names, dyn_trainer_names, energy_sources, valid_feature_groups): + from train import NewPipeline + isolator = get_isolator(data_path, isolator, profile, pipeline_name, target_hints, bg_hints, abs_pipeline_name) + extractor = get_extractor(extractor) + pipeline = NewPipeline(pipeline_name, abs_trainer_names, dyn_trainer_names, extractor=extractor, isolator=isolator, target_energy_sources=energy_sources ,valid_feature_groups=valid_feature_groups) + return pipeline \ No newline at end of file diff --git a/cmd/main.py b/cmd/main.py index 15bd6751..b4391855 100644 --- a/cmd/main.py +++ b/cmd/main.py @@ -10,7 +10,6 @@ default_trainers = ",".join(default_trainer_names) default_version = "v0.7" -UTC_OFFSET_TIMEDELTA = datetime.datetime.utcnow() - datetime.datetime.now() data_path = os.getenv("CPE_DATAPATH", data_path) cur_path = os.path.join(os.path.dirname(__file__), '.') @@ -19,238 +18,60 @@ sys.path.append(src_path) from util.prom_types import PROM_SERVER, PROM_QUERY_INTERVAL, PROM_QUERY_STEP, PROM_QUERY_START_TIME, PROM_QUERY_END_TIME, PROM_HEADERS, PROM_SSL_DISABLE, PROM_THIRDPARTY_METRICS -from util.prom_types import metric_prefix as KEPLER_METRIC_PREFIX, node_info_column, prom_responses_to_results, TIMESTAMP_COL, feature_to_query, update_thirdparty_metrics -from util.train_types import ModelOutputType, FeatureGroup, FeatureGroups, is_single_source_feature_group, SingleSourceFeatures +from util.prom_types import metric_prefix as KEPLER_METRIC_PREFIX, prom_responses_to_results, TIMESTAMP_COL, feature_to_query, update_thirdparty_metrics +from util.extract_types import get_expected_power_columns +from util.train_types import ModelOutputType, FeatureGroups, is_single_source_feature_group, SingleSourceFeatures, all_feature_groups from util.loader import load_json, DEFAULT_PIPELINE, load_pipeline_metadata, get_pipeline_path, get_model_group_path, list_pipelines, list_model_names, load_metadata, load_csv, get_machine_path, get_preprocess_folder, get_general_filename from util.saver import save_json, save_csv, save_train_args from util.config import ERROR_KEY from util import get_valid_feature_group_from_queries, PowerSourceMap from train.prom.prom_query import _range_queries from train.exporter import exporter +from train import load_class +from cmd_plot import ts_plot, feature_power_plot, summary_plot +from cmd_util import extract_time, get_validate_df, summary_validation, get_extractor, check_ot_fg, get_pipeline, assert_train, get_isolator, UTC_OFFSET_TIMEDELTA -def print_file_to_stdout(args): - file_path = os.path.join(data_path, args.output) - try: - with open(file_path, 'r') as file: - contents = file.read() - print(contents) - except FileNotFoundError: - print(f"Error: Output '{file_path}' not found.") - except IOError: - print(f"Error: Unable to read output '{file_path}'.") - -def extract_time(benchmark_filename): - data = load_json(data_path, benchmark_filename) - if benchmark_filename != "customBenchmark": - start_str = data["metadata"]["creationTimestamp"] - start = datetime.datetime.strptime(start_str, '%Y-%m-%dT%H:%M:%SZ') - end_str = data["status"]["results"][-1]["repetitions"][-1]["pushedTime"].split(".")[0] - end = datetime.datetime.strptime(end_str, '%Y-%m-%d %H:%M:%S') - else: - start_str = data["startTimeUTC"] - start = datetime.datetime.strptime(start_str, '%Y-%m-%dT%H:%M:%SZ') - end_str = data["endTimeUTC"] - end = datetime.datetime.strptime(end_str, '%Y-%m-%dT%H:%M:%SZ') - print(UTC_OFFSET_TIMEDELTA) - return start-UTC_OFFSET_TIMEDELTA, end-UTC_OFFSET_TIMEDELTA - -def summary_validation(validate_df): - if len(validate_df) == 0: - print("No data for validation.") - return - items = [] - metric_to_validate_pod = { - "cgroup": "kepler_container_cgroupfs_cpu_usage_us_total", - # "hwc": "kepler_container_cpu_instructions_total", - "hwc": "kepler_container_cpu_cycles_total", - "kubelet": "kepler_container_kubelet_cpu_usage_total", - "bpf": "kepler_container_bpf_cpu_time_us_total", - } - metric_to_validate_power = { - "rapl": "kepler_node_package_joules_total", - "platform": "kepler_node_platform_joules_total" - } - for metric, query in metric_to_validate_pod.items(): - target_df = validate_df[validate_df["query"]==query] - valid_df = target_df[target_df[">0"] > 0] - if len(valid_df) == 0: - # no data - continue - availability = len(valid_df)/len(target_df) - valid_datapoint = valid_df[">0"].sum() - item = dict() - item["usage_metric"] = metric - item["availability"] = availability - item["valid_datapoint"] = valid_datapoint - items += [item] - summary_df = pd.DataFrame(items) - print(summary_df) - for metric, query in metric_to_validate_pod.items(): - target_df = validate_df[validate_df["query"]==query] - no_data_df = target_df[target_df["count"] == 0] - zero_data_df = target_df[target_df[">0"] == 0] - valid_df = target_df[target_df[">0"] > 0] - print("==== {} ====".format(metric)) - if len(no_data_df) > 0: - print("{} pods: \tNo data for {}".format(len(no_data_df), pd.unique(no_data_df["scenarioID"]))) - if len(zero_data_df) > 0: - print("{} pods: \tZero data for {}".format(len(zero_data_df), pd.unique(zero_data_df["scenarioID"]))) - - print("{} pods: \tValid\n".format(len(valid_df))) - print("Valid data points:") - print( "Empty" if len(valid_df[">0"]) == 0 else valid_df.groupby(["scenarioID"]).sum()[[">0"]]) - for metric, query in metric_to_validate_power.items(): - target_df = validate_df[validate_df["query"]==query] - print("{} data: \t{}".format(metric, target_df[">0"].values)) - -def get_validate_df(benchmark_filename, query_response): - items = [] - query_results = prom_responses_to_results(query_response) - container_queries = [query for query in query_results.keys() if "container" in query] - status_data = load_json(data_path, benchmark_filename) - if status_data is None or status_data.get("status", None) == None: - # select all with keyword - for query in container_queries: - df = query_results[query] - if len(df) == 0: - # set validate item // no value - item = dict() - item["pod"] = benchmark_filename - item["scenarioID"] = "" - item["query"] = query - item["count"] = 0 - item[">0"] = 0 - item["total"] = 0 - items += [item] - continue - filtered_df = df[df["pod_name"].str.contains(benchmark_filename)] - # set validate item - item = dict() - item["pod"] = benchmark_filename - item["scenarioID"] = "" - item["query"] = query - item["count"] = len(filtered_df) - item[">0"] = len(filtered_df[filtered_df[query] > 0]) - item["total"] = filtered_df[query].max() - items += [item] - else: - cpe_results = status_data["status"]["results"] - for result in cpe_results: - scenarioID = result["scenarioID"] - scenarios = result["scenarios"] - configurations = result["configurations"] - for k, v in scenarios.items(): - result[k] = v - for k, v in configurations.items(): - result[k] = v - repetitions = result["repetitions"] - for rep in repetitions: - podname = rep["pod"] - for query in container_queries: - df = query_results[query] - if len(df) == 0: - # set validate item // no value - item = dict() - item["pod"] = podname - item["scenarioID"] = scenarioID - item["query"] = query - item["count"] = 0 - item[">0"] = 0 - item["total"] = 0 - items += [item] - continue - filtered_df = df[df["pod_name"]==podname] - # set validate item - item = dict() - item["pod"] = podname - item["scenarioID"] = scenarioID - item["query"] = query - item["count"] = len(filtered_df) - item[">0"] = len(filtered_df[filtered_df[query] > 0]) - item["total"] = filtered_df[query].max() - items += [item] - energy_queries = [query for query in query_results.keys() if "_joules" in query] - for query in energy_queries: - df = query_results[query] - if len(df) == 0: - # set validate item // no value - item = dict() - item["pod"] = "" - item["scenarioID"] = "" - item["query"] = query - item["count"] = 0 - item[">0"] = 0 - item["total"] = 0 - items += [item] - continue - # set validate item - item = dict() - item["pod"] = "" - item["scenarioID"] = "" - item["query"] = query - item["count"] = len(df) - item[">0"] = len(df[df[query] > 0]) - item["total"] = df[query].max() - items += [item] - other_queries = [query for query in query_results.keys() if query not in container_queries and query not in energy_queries] - for query in other_queries: - df = query_results[query] - if len(df) == 0: - # set validate item // no value - item = dict() - item["pod"] = benchmark_filename - item["scenarioID"] = "" - item["query"] = query - item["count"] = 0 - item[">0"] = 0 - item["total"] = 0 - items += [item] - continue - # set validate item - item = dict() - item["pod"] = benchmark_filename - item["scenarioID"] = "" - item["query"] = query - item["count"] = len(df) - item[">0"] = len(df[df[query] > 0]) - item["total"] = df[query].max() - items += [item] - validate_df = pd.DataFrame(items) - if not validate_df.empty: - print(validate_df.groupby(["scenarioID", "query"]).sum()[["count", ">0"]]) - return validate_df - -def check_ot_fg(args, valid_fg): - ot = None - fg = None - if args.output_type: - try: - ot = ModelOutputType[args.output_type] - except KeyError: - print("invalid output type. please use AbsPower or DynPower", args.output_type) - exit() - if args.feature_group: - valid_fg_name_list = [fg.name for fg in valid_fg] - try: - fg = FeatureGroup[args.feature_group] - if args.feature_group not in valid_fg_name_list: - print("feature group: {} is not available in your data. please choose from the following list: {}".format(args.feature_group, valid_fg_name_list)) - exit() - except KeyError: - print("invalid feature group: {}. valid feature group are {}.".format((args.feature_group, [fg.name for fg in valid_fg]))) - exit() - return ot, fg +import threading + +""" +query + + make a query to prometheus metric server for a specified set of queries (with metric prefix) and save as a map of metric name to dataframe containing labels and value of the metric. + + +arguments: +- --server : specify prometheus server URL (PROM_HEADERS and PROM_SSL_DISABLE configuration might be set via configmap or environment variables if needed) +- --output : specify query output file name. There will be two files generated: [output].json and [output]_validate_result.csv for kepler query response and validated results. +- --metric-prefix : specify prefix to filter target query metrics (default: kepler) +- --thirdparty-metrics : specify list of third party metrics to query in addition to the metrics with specified metric prefix (optional) +- Time range of query can be specified by either of the following choices + - --input : file containing data of start time and end time which could be either of the following format + - CPE benchmark resource in json if you run workload with CPE-operator (https://github.com/IBM/cpe-operator) + - custom benchmark in json with `startTimeUTC` and `endTimeUTC` data + - --benchmark : file to save query timestamp according to either of the following raw parameters + - --start-time, --end-time : start time and end time in UTC (date -u +%Y-%m-%dT%H:%M:%SZ) + - --interval : last interval in second + * The priority is input > start-time,end-time > interval +""" def query(args): from prometheus_api_client import PrometheusConnect prom = PrometheusConnect(url=args.server, headers=PROM_HEADERS, disable_ssl=PROM_SSL_DISABLE) - benchmark_filename = args.input - if benchmark_filename == "": - print("need name of the benchmark: sample, stressng, or customBenchmark") - exit() - filepath = os.path.join(data_path, benchmark_filename+".json") - if not os.path.isfile(filepath): + start = None + end = None + if args.input: + benchmark_filename = args.input + filepath = os.path.join(data_path, benchmark_filename+".json") + if os.path.isfile(filepath): + print("Query from {}.".format(benchmark_filename)) + start, end = extract_time(data_path, benchmark_filename) + if start is None or end is None: + if args.benchmark: + benchmark_filename = args.benchmark + else: + print("Please provide either input (for providing timestamp file) or benchmark (for saving query timestamp)") + exit() if args.start_time != "" and args.end_time != "": # by [start time, end time] print("Query from start_time {} to end_time {}.".format(args.start_time, args.end_time)) @@ -268,9 +89,7 @@ def query(args): save_json(path=data_path, name=benchmark_filename, data=item) start = datetime.datetime.strptime(item["startTimeUTC"], '%Y-%m-%dT%H:%M:%SZ') - UTC_OFFSET_TIMEDELTA end = datetime.datetime.strptime(item["endTimeUTC"], '%Y-%m-%dT%H:%M:%SZ') - UTC_OFFSET_TIMEDELTA - else: - print("Query from {}.".format(benchmark_filename)) - start, end = extract_time(benchmark_filename) + available_metrics = prom.all_metrics() queries = None @@ -285,91 +104,46 @@ def query(args): response = _range_queries(prom, queries, start, end, args.step, None) save_json(path=data_path, name=args.output, data=response) # try validation if applicable - validate_df = get_validate_df(benchmark_filename, response) + validate_df = get_validate_df(data_path, benchmark_filename, response) summary_validation(validate_df) save_csv(path=data_path, name=args.output + "_validate_result", data=validate_df) -def validate(args): - if not args.benchmark: - print("Need --benchmark") - exit() +""" +validate + + quickly validate each queried metric that how many data it contains and how many of them have non-zero values based on benchmark file +arguments: +- --input : specify kepler query response file (output of `query` function) +- --output : specify output file name. The output will be in csv format +- --benchmark : if benchmark is based on CPE, the validated result will be groupped by the scenario. + Otherwise, the validated result will be an accumulated of all containers. +""" + +def validate(args): response_filename = args.input response = load_json(data_path, response_filename) - validate_df = get_validate_df(args.benchmark, response) + validate_df = get_validate_df(data_path, args.benchmark, response) summary_validation(validate_df) if args.output: save_csv(path=data_path, name=args.output, data=validate_df) -def assert_train(trainer, data, energy_components): - import pandas as pd - node_types = pd.unique(data[node_info_column]) - for node_type in node_types: - node_type_str = int(node_type) - node_type_filtered_data = data[data[node_info_column] == node_type] - X_values = node_type_filtered_data[trainer.features].values - for component in energy_components: - output = trainer.predict(node_type_str, component, X_values) - if output is not None: - assert len(output) == len(X_values), "length of predicted values != features ({}!={})".format(len(output), len(X_values)) - -def get_isolator(isolator, profile, pipeline_name, target_hints, bg_hints): - pipeline_path = get_pipeline_path(data_path, pipeline_name=pipeline_name) - from train import MinIdleIsolator, NoneIsolator, DefaultProfiler, ProfileBackgroundIsolator, TrainIsolator, generate_profiles - supported_isolator = { - MinIdleIsolator().get_name(): MinIdleIsolator(), - NoneIsolator().get_name(): NoneIsolator(), - } - - if target_hints: - target_hints = target_hints.split(",") - else: - target_hints = [] +""" +extract - if bg_hints: - bg_hints = bg_hints.split(",") - else: - bg_hints = [] - - profiles = dict() - if profile: - idle_response = load_json(data_path, profile) - idle_data = prom_responses_to_results(idle_response) - if idle_data is None: - print("failed to read idle data") - return None - profile_map = DefaultProfiler.process(idle_data, profile_top_path=pipeline_path) - profiles = generate_profiles(profile_map) - profile_isolator = ProfileBackgroundIsolator(profiles, idle_data) - trainer_isolator = TrainIsolator(idle_data=idle_data, profiler=DefaultProfiler, target_hints=target_hints, bg_hints=bg_hints, abs_pipeline_name=pipeline_name) - supported_isolator[profile_isolator.get_name()] = profile_isolator - else: - trainer_isolator = TrainIsolator(target_hints=target_hints, bg_hints=bg_hints, abs_pipeline_name=pipeline_name) - - supported_isolator[trainer_isolator.get_name()] = trainer_isolator - - if isolator not in supported_isolator: - print("isolator {} is not supported. supported isolator: {}".format(isolator, supported_isolator.keys())) - return None - return supported_isolator[isolator] - -def get_extractor(extractor): - from train import DefaultExtractor, SmoothExtractor - supported_extractor = { - DefaultExtractor().get_name(): DefaultExtractor(), - SmoothExtractor().get_name(): SmoothExtractor() - } - if extractor not in supported_extractor: - print("extractor {} is not supported. supported extractor: {}".format(extractor, supported_extractor.keys())) - return None - return supported_extractor[extractor] + parse kepler query response, filter only target feature group and energy source, and calculate per-second (guage) value at each timestamp -def get_pipeline(pipeline_name, extractor, profile, target_hints, bg_hints, isolator, abs_trainer_names, dyn_trainer_names, energy_sources, valid_feature_groups): - from train import NewPipeline - isolator = get_isolator(isolator, profile, pipeline_name, target_hints, bg_hints) - extractor = get_extractor(extractor) - pipeline = NewPipeline(pipeline_name, abs_trainer_names, dyn_trainer_names, extractor=extractor, isolator=isolator, target_energy_sources=energy_sources ,valid_feature_groups=valid_feature_groups) - return pipeline +arguments: +- --input : specify kepler query response file (output of `query` function) as an input to the extractor +- --output : specify extracted file name. There will be two files generated: extracted_[output].csv and extracted_[output]_raw.csv for guage and counter data, respectively. +- --extractor : specify extractor class (default or smooth) +- --feature-group : specify target feature group (check https://sustainable-computing.io/kepler_model_server/pipeline/#feature-group) +- --energy-source : specify target energy source (check https://sustainable-computing.io/kepler_model_server/pipeline/#energy-source) +- --output-type : specify target output type, (default = AbsPower) + - AbsPower (index = timestamp), DynPower (index = timestamp and container ID) + - check https://sustainable-computing.io/kepler_model_server/pipeline/#power-isolation +- --thirdparty-metrics : specify list of third party metric to export (required only for ThirdParty feature group) +""" def extract(args): extractor = get_extractor(args.extractor) @@ -398,17 +172,151 @@ def extract(args): query = feature_to_query(FeatureGroups[fg][0]) raw_data = query_results[query][[TIMESTAMP_COL, query]].groupby([TIMESTAMP_COL]).sum() save_csv(data_path, "extracted_" + args.output[0:-4]+"_raw.csv", raw_data) + print("extract {} train data to {}".format(args.output_type, "extracted_" + args.output)) return feature_power_data, power_cols +""" +isolate + + extract data from kepler query and remove idle/background power from the power columns (check https://sustainable-computing.io/kepler_model_server/pipeline/#power-isolation) + +arguments: +- --input : specify kepler query response file (output of `query` function) +- --output : specify extracted file name. + There will be three files generated: + - extracted_[output].csv for extracted data (gauge values) + - extracted_[output]_raw.csv for raw extracted data (counter values) + - isolated_[output].csv for isolated data +- --extractor : specify extractor class (default or smooth) +- --isolator : specify isolator class (none, profile, min, trainer) +- --feature-group : specify target feature group (check https://sustainable-computing.io/kepler_model_server/pipeline/#feature-group) +- --energy-source : specify target energy source (check https://sustainable-computing.io/kepler_model_server/pipeline/#energy-source) +- --output-type : specify target extracting output type, (default = AbsPower) + - AbsPower (index = timestamp), DynPower (index = timestamp and container ID) + - check https://sustainable-computing.io/kepler_model_server/pipeline/#power-isolation +- --thirdparty-metrics : specify list of third party metric to export (required only for ThirdParty feature group) +- --pipeline-name : specify pipeline name to be used for initializing profile and trainer isolator (only required for profile and trainer isolator) +- --profile : specify kepler query result when running no workload (required only for profile isolator) +- Hints required only for trainer isolator to separate background process can be specified by either of + - --target-hints : specify target process keywords (pod's substring with comma delimiter) to keep in DynPower model training + - --bg-hints : specify background process keywords to remove from DynPower model training + * If both are defined, target-hints will be considered first. +""" + def isolate(args): extracted_data, power_labels = extract(args) if extracted_data is None or power_labels is None: return None pipeline_name = DEFAULT_PIPELINE if not args.pipeline_name else args.pipeline_name - isolator = get_isolator(args.isolator, args.profile, pipeline_name, args.target_hints, args.bg_hints) + isolator = get_isolator(data_path, args.isolator, args.profile, pipeline_name, args.target_hints, args.bg_hints, args.abs_pipeline_name) isolated_data = isolator.isolate(extracted_data, label_cols=power_labels, energy_source=args.energy_source) if args.output: save_csv(data_path, "isolated_" + args.output, isolated_data) + print("isolate train data to {}".format("isolated_" + args.output)) + +""" +isolate_from_data + + remove idle/background power from the power columns read from extracted data from `extract` function output + +arguments: +- --input : specify extracted output file +- --output : specify isolated file name. The output filename is isolated_[output].csv +- --isolator : specify isolator class (none, profile, min, trainer) +- --feature-group : specify target feature group (check https://sustainable-computing.io/kepler_model_server/pipeline/#feature-group) +- --energy-source : specify target energy source (check https://sustainable-computing.io/kepler_model_server/pipeline/#energy-source) +- --thirdparty-metrics : specify list of third party metric to export (required only for ThirdParty feature group) +- --pipeline-name : specify pipeline name to be used for initializing profile and trainer isolator (only required for profile and trainer isolator) +- --profile : specify kepler query result when running no workload (required only for profile isolator) +- Hints required only for trainer isolator to separate background process can be specified by either of + - --target-hints : specify target process keywords (pod's substring with comma delimiter) to keep in DynPower model training + - --bg-hints : specify background process keywords to remove from DynPower model training + * If both are defined, target-hints will be considered first. +""" + +def isolate_from_data(args): + energy_components = PowerSourceMap[args.energy_source] + extracted_data = load_csv(data_path, "extracted_" + args.input) + power_columns = get_expected_power_columns(energy_components=energy_components) + pipeline_name = DEFAULT_PIPELINE if not args.pipeline_name else args.pipeline_name + isolator = get_isolator(data_path, args.isolator, args.profile, pipeline_name, args.target_hints, args.bg_hints, args.abs_pipeline_name) + isolated_data = isolator.isolate(extracted_data, label_cols=power_columns, energy_source=args.energy_source) + if args.output: + save_csv(data_path, "isolated_" + args.output, isolated_data) + +""" +train_from_data + + train extracted/isolated data for AbsPower/DynPower models from `extract` or `isolate`/`isolate_from_data` function output + +arguments: +- --input : specify extracted/isolated output file +- --pipeline-name : specify output pipeline name. + - The trained model will be saved in folder [pipeline_name]/[energy_source]/[output_type]/[feature_group]/[model_name]_[node_type] +- --feature-group : specify target feature group (check https://sustainable-computing.io/kepler_model_server/pipeline/#feature-group) +- --energy-source : specify target energy source (check https://sustainable-computing.io/kepler_model_server/pipeline/#energy-source) +- --output-type : specify target output type (check https://sustainable-computing.io/kepler_model_server/pipeline/#power-isolation) - default: AbsPower +- --trainers : specify trainer names (use comma(,) as delimiter, default: XgboostFitTrainer) +- --thirdparty-metrics : specify list of third party metric to export (required only for ThirdParty feature group) +""" + +def train_from_data(args): + + # Inject thirdparty_metrics to FeatureGroup + if args.thirdparty_metrics != "": + update_thirdparty_metrics(args.thirdparty_metrics) + elif PROM_THIRDPARTY_METRICS != [""]: + update_thirdparty_metrics(PROM_THIRDPARTY_METRICS) + valid_fg = [fg_key for fg_key in FeatureGroups.keys()] + ot, fg = check_ot_fg(args, valid_fg) + if fg is None or ot is None: + print("feature group {} or model output type {} is wrong. (valid feature group: {})".format(args.feature_group, args.output_type, all_feature_groups)) + exit() + + energy_components = PowerSourceMap[args.energy_source] + node_level=False + if ot == ModelOutputType.AbsPower: + node_level=True + + data = load_csv(data_path, args.input) + power_columns = get_expected_power_columns(energy_components=energy_components) + + energy_components = PowerSourceMap[args.energy_source] + + trainers = args.trainers.split(",") + metadata_list = [] + for trainer in trainers: + trainer_class = load_class("trainer", trainer) + trainer = trainer_class(energy_components, args.feature_group, args.energy_source, node_level=node_level, pipeline_name=args.pipeline_name) + trainer.process(data, power_columns, pipeline_lock=threading.Lock()) + assert_train(trainer, data, energy_components) + metadata = trainer.get_metadata() + metadata_list += [metadata] + metadata_df = pd.concat(metadata_list) + print_cols = ["model_name", "mae", "mape"] + print(metadata_df[print_cols]) + +""" +train + + automate a pipeline process to kepler query input from `query` function output to all available feature groups for both AbsPower and DynPower + +arguments: +- --input : specify kepler query response file (output of `query` function) +- --pipeline-name : specify output pipeline name. + - The trained model will be saved in folder [pipeline_name]/[energy_source]/[output_type]/[feature_group]/[model_name]_[node_type] +- --extractor : specify extractor class (default or smooth) +- --isolator : specify isolator class (none, profile, min, trainer) +- --profile : specify kepler query result when running no workload (required only for profile isolator) +- Hints required only for trainer isolator to separate background process can be specified by either of + - --target-hints : specify target process keywords (pod's substring with comma delimiter) to keep in DynPower model training + - --bg-hints : specify background process keywords to remove from DynPower model training + * If both are defined, target-hints will be considered first. +- --abs-trainers : specify a list of trainers for training AbsPower models (use comma(,) as delimiter) - default: apply all available trainers +- --dyn-trainers : specify a list of trainers for training DynPower models (use comma(,) as delimiter) - default: apply all available trainers +- --energy-source : specify target energy sources (use comma(,) as delimiter) +- --thirdparty-metrics : specify list of third party metric to export (required only for ThirdParty feature group) +""" def train(args): import warnings @@ -419,6 +327,12 @@ def train(args): print("must give input filename (query response) via --input for training.") exit() + # Inject thirdparty_metrics to FeatureGroup + if args.thirdparty_metrics != "": + update_thirdparty_metrics(args.thirdparty_metrics) + elif PROM_THIRDPARTY_METRICS != [""]: + update_thirdparty_metrics(PROM_THIRDPARTY_METRICS) + pipeline_name = DEFAULT_PIPELINE if args.pipeline_name: pipeline_name = args.pipeline_name @@ -439,10 +353,14 @@ def train(args): valid_feature_groups = list(set(valid_feature_groups).intersection(set(valid_fg))) input_query_results_list += [query_results] + if args.dyn_trainers == "default": + args.dyn_trainers = default_trainers + if args.abs_trainers == "default": + args.abs_trainers = default_trainers abs_trainer_names = args.abs_trainers.split(",") dyn_trainer_names = args.dyn_trainers.split(",") - pipeline = get_pipeline(pipeline_name, args.extractor, args.profile, args.target_hints, args.bg_hints, args.isolator, abs_trainer_names, dyn_trainer_names, energy_sources, valid_feature_groups) + pipeline = get_pipeline(data_path, pipeline_name, args.extractor, args.profile, args.target_hints, args.bg_hints, args.abs_pipeline_name, args.isolator, abs_trainer_names, dyn_trainer_names, energy_sources, valid_feature_groups) if pipeline is None: print("cannot get pipeline") exit() @@ -484,7 +402,26 @@ def train(args): if metadata_df is not None: print(metadata_df.sort_values(by=[ERROR_KEY])[print_cols]) - warnings.resetwarnings() + warnings.resetwarnings() + +""" +estimate + + apply trained model of specified pipeline to predict power consumption from kepler metrics + +arguments: +- --input : specify kepler query response file (output of `query` function) +- --pipeline-name : specify trained pipeline name. +- --model-name : specified model name (optional, applying all models if not specified) +- --energy-source : specify target energy sources (use comma(,) as delimiter) +- --output-type : specify target output type (check https://sustainable-computing.io/kepler_model_server/pipeline/#power-isolation) - default: AbsPower +- --thirdparty-metrics : specify list of third party metric to export (required only for ThirdParty feature group) +- --profile : specify kepler query result when running no workload (required only for pipeline with profile isolator) +- Hints required only for pipeline with trainer isolator to separate background process can be specified by either of + - --target-hints : specify target process keywords (pod's substring with comma delimiter) to keep in DynPower model training + - --bg-hints : specify background process keywords to remove from DynPower model training + * If both are defined, target-hints will be considered first. +""" def estimate(args): if not args.input: @@ -493,6 +430,12 @@ def estimate(args): from estimate import load_model, default_predicted_col_func, compute_error + # Inject thirdparty_metrics to FeatureGroup + if args.thirdparty_metrics != "": + update_thirdparty_metrics(args.thirdparty_metrics) + elif PROM_THIRDPARTY_METRICS != [""]: + update_thirdparty_metrics(PROM_THIRDPARTY_METRICS) + inputs = args.input.split(",") energy_sources = args.energy_source.split(",") input_query_results_list = [] @@ -526,7 +469,7 @@ def estimate(args): if pipeline_metadata is None: print("no metadata for pipeline {}.".format(pipeline_name)) continue - pipeline = get_pipeline(pipeline_name, args.extractor, args.profile, args.target_hints, args.bg_hints, pipeline_metadata["isolator"], pipeline_metadata["abs_trainers"], pipeline_metadata["dyn_trainers"], energy_sources, valid_fg) + pipeline = get_pipeline(data_path, pipeline_name, pipeline_metadata["extractor"], args.profile, args.target_hints, args.bg_hints, args.abs_pipeline_name, pipeline_metadata["isolator"], pipeline_metadata["abs_trainers"], pipeline_metadata["dyn_trainers"], energy_sources, valid_fg) if pipeline is None: print("cannot get pipeline {}.".format(pipeline_name)) continue @@ -596,107 +539,20 @@ def estimate(args): best_model_id_map[energy_source] = "{} using {}".format(path_splits[-1], path_splits[-2]) return best_result_map, power_labels_map, best_model_id_map, pd.DataFrame(summary_items) -def _ts_plot(data, cols, title, output_folder, name, labels=None, subtitles=None, ylabel=None): - plot_height = 3 - plot_width = 10 - import matplotlib.pyplot as plt - import seaborn as sns - sns.set(font_scale=1.2) - fig, axes = plt.subplots(len(cols), 1, figsize=(plot_width, len(cols)*plot_height)) - for i in range(0, len(cols)): - if len(cols) == 1: - ax = axes - else: - ax = axes[i] - if isinstance(cols[i], list): - # multiple lines - for j in range(0, len(cols[i])): - sns.lineplot(data=data, x=TIMESTAMP_COL, y=cols[i][j], ax=ax, label=labels[j]) - ax.set_title(subtitles[i]) - else: - sns.lineplot(data=data, x=TIMESTAMP_COL, y=cols[i], ax=ax) - ax.set_title(cols[i]) - if ylabel is not None: - ax.set_ylabel(ylabel) - plt.suptitle(title, x=0.5, y=0.99) - plt.tight_layout() - filename = os.path.join(output_folder, name + ".png") - fig.savefig(filename) - plt.close() - -def _feature_power_plot(data, model_id, output_type, energy_source, feature_cols, actual_power_cols, predicted_power_cols, output_folder, name): - plot_height = 5 - plot_width = 5 - - import matplotlib.pyplot as plt - import seaborn as sns - sns.set(font_scale=1.2) - row_num = len(feature_cols) - col_num = len(actual_power_cols) - width = max(10, col_num*plot_width) - fig, axes = plt.subplots(row_num, col_num, figsize=(width, row_num*plot_height)) - for xi in range(0, row_num): - feature_col = feature_cols[xi] - for yi in range(0, col_num): - if row_num == 1: - if col_num == 1: - ax = axes - else: - ax = axes[yi] - else: - if col_num == 1: - ax = axes[xi] - else: - ax = axes[xi][yi] - sorted_data = data.sort_values(by=[feature_col]) - sns.scatterplot(data=sorted_data, x=feature_col, y=actual_power_cols[yi], ax=ax, label="actual") - sns.lineplot(data=sorted_data, x=feature_col, y=predicted_power_cols[yi], ax=ax, label="predicted", color='C1') - if xi == 0: - ax.set_title(actual_power_cols[yi]) - if yi == 0: - ax.set_ylabel("Power (W)") - title = "{} {} prediction correlation \n by {}".format(energy_source, output_type, model_id) - plt.suptitle(title, x=0.5, y=0.99) - plt.tight_layout() - filename = os.path.join(output_folder, name + ".png") - fig.savefig(filename) - plt.close() - -def _summary_plot(energy_source, summary_df, output_folder, name): - if len(summary_df) == 0: - print("no summary data to plot") - return - - plot_height = 3 - plot_width = 20 - - import matplotlib.pyplot as plt - import seaborn as sns - sns.set(font_scale=1.2) - - energy_components = PowerSourceMap[energy_source] - col_num = len(energy_components) - fig, axes = plt.subplots(col_num, 1, figsize=(plot_width, plot_height*col_num)) - for i in range(0, col_num): - component = energy_components[i] - data = summary_df[(summary_df["energy_source"]==energy_source) & (summary_df["energy_component"]==component)] - data = data.sort_values(by=["Feature Group", "MAE"]) - if col_num == 1: - ax = axes - else: - ax = axes[i] - sns.barplot(data=data, x="Feature Group", y="MAE", hue="Model", ax=ax) - ax.set_title(component) - ax.set_ylabel("MAE (Watt)") - ax.set_ylim((0, 100)) - if i < col_num-1: - ax.set_xlabel("") - ax.legend(bbox_to_anchor=(1.05, 1.05)) - plt.suptitle("{} {} error".format(energy_source, args.output_type)) - plt.tight_layout() - filename = os.path.join(output_folder, name + ".png") - fig.savefig(filename) - plt.close() +""" +plot + + plot data (required results from `extract` and `isolate` functions) + +arguments: +- --target_data : specify target data (preprocess, estimate, error) + - `preprocess` plots time series of usage and power metrics for both AbsPower and DynPower + - `estimate` passes all arguments to `estimate` function, and plots the predicted time series and correlation between usage and power metrics + - `error` passes all arguments to `estimate` function, and plots the summary of prediction error +- --energy-source : specify target energy sources (use comma(,) as delimiter) +- --extractor : specify extractor to get preprocessed data of AbsPower model linked to the input data +- --isolator : specify isolator to get preprocessed data of DynPower model linked to the input data +""" def plot(args): pipeline_name = DEFAULT_PIPELINE if not args.pipeline_name else args.pipeline_name @@ -734,11 +590,11 @@ def plot(args): feature_cols = FeatureGroups[fg] power_cols = [col for col in data.columns if "power" in col] feature_data = data.groupby([TIMESTAMP_COL]).sum() - _ts_plot(feature_data, feature_cols, "Feature group: {}".format(fg.name), output_folder, data_filename) + ts_plot(feature_data, feature_cols, "Feature group: {}".format(fg.name), output_folder, data_filename) if not energy_plot: power_data = data.groupby([TIMESTAMP_COL]).max() data_filename = get_general_filename(args.target_data, energy_source, None, ot, args.extractor, args.isolator) - _ts_plot(power_data, power_cols, "Power source: {}".format(energy_source), output_folder, data_filename, ylabel="Power (W)") + ts_plot(power_data, power_cols, "Power source: {}".format(energy_source), output_folder, data_filename, ylabel="Power (W)") elif args.target_data == "estimate": from estimate import default_predicted_col_func from sklearn.preprocessing import MaxAbsScaler @@ -766,21 +622,36 @@ def plot(args): predicted_power_cols += [predicted_power_colname] data_filename = get_general_filename(args.target_data, energy_source, fg, ot, args.extractor, args.isolator) # plot prediction - _ts_plot(data, cols, "{} {} Prediction Result \n by {}".format(energy_source, ot.name, model_id), output_folder, "{}_{}".format(data_filename, model_id), subtitles=subtitles, labels=plot_labels, ylabel="Power (W)") + ts_plot(data, cols, "{} {} Prediction Result \n by {}".format(energy_source, ot.name, model_id), output_folder, "{}_{}".format(data_filename, model_id), subtitles=subtitles, labels=plot_labels, ylabel="Power (W)") # plot correlation to utilization if feature group is set if fg is not None: feature_cols = FeatureGroups[fg] scaler = MaxAbsScaler() data[feature_cols] = best_restult[[TIMESTAMP_COL] + feature_cols].groupby([TIMESTAMP_COL]).sum().sort_index() data[feature_cols] = scaler.fit_transform(data[feature_cols]) - _feature_power_plot(data, model_id, ot.name, energy_source, feature_cols, actual_power_cols, predicted_power_cols, output_folder, "{}_{}_corr".format(data_filename, model_id)) + feature_power_plot(data, model_id, ot.name, energy_source, feature_cols, actual_power_cols, predicted_power_cols, output_folder, "{}_{}_corr".format(data_filename, model_id)) elif args.target_data == "error": from estimate import default_predicted_col_func from sklearn.preprocessing import MaxAbsScaler _, _, _, summary_df = estimate(args) for energy_source in energy_sources: data_filename = get_general_filename(args.target_data, energy_source, fg, ot, args.extractor, args.isolator) - _summary_plot(energy_source, summary_df, output_folder, data_filename) + summary_plot(args, energy_source, summary_df, output_folder, data_filename) + +""" +export + + export preprocessed data and trained models to the kepler-model-db path + +arguments: +- --id : specify machine ID +- --pipeline-name : specify pipeline name that contains the trained models +- --output : specify kepler-model-db/models in local path +- --publisher : specify publisher (github) account +- --benchmark : specify benchmark file that contains data of start time and end time which could be either of the following format + - CPE benchmark resource in json if you run workload with CPE-operator (https://github.com/IBM/cpe-operator) + - custom benchmark in json with `startTimeUTC` and `endTimeUTC` data +""" def export(args): if not args.id: @@ -809,7 +680,7 @@ def export(args): pipeline_path = get_pipeline_path(data_path, pipeline_name=pipeline_name) machine_path = get_machine_path(output_path, args.version, machine_id) - collect_date, _ = extract_time(args.benchmark) + collect_date, _ = extract_time(data_path, args.benchmark) exporter.export(data_path, pipeline_path, machine_path, machine_id=machine_id, version=args.version, publisher=args.publisher, collect_date=collect_date, include_raw=args.include_raw) args.energy_source = ",".join(PowerSourceMap.keys()) @@ -838,6 +709,16 @@ def export(args): args.feature_group = feature_group plot(args) +""" +plot_scenario + + separatedly plot data on specified scenario. This function is now limited to only CPE-based benchmark. + +arguments: +- --benchmark : specify CPE benchmark resource in json file +- Please refer to `plot` function for the rest arguments. +""" + def plot_scenario(args): if not args.benchmark: print("Need --benchmark") @@ -907,20 +788,19 @@ def plot_scenario(args): feature_cols = FeatureGroups[fg] power_cols = [col for col in data.columns if "power" in col] feature_data = data.groupby([TIMESTAMP_COL]).sum() - _ts_plot(feature_data, feature_cols, "Feature group: {} ({})".format(fg.name, args.scenario), output_folder, data_filename) + ts_plot(feature_data, feature_cols, "Feature group: {} ({})".format(fg.name, args.scenario), output_folder, data_filename) if not energy_plot: power_data = data.groupby([TIMESTAMP_COL]).max() data_filename = get_general_filename(args.target_data, energy_source, None, ot, args.extractor, args.isolator) + "_" + args.scenario - _ts_plot(power_data, power_cols, "Power source: {} ({})".format(energy_source, args.scenario), output_folder, data_filename, ylabel="Power (W)") + ts_plot(power_data, power_cols, "Power source: {} ({})".format(energy_source, args.scenario), output_folder, data_filename, ylabel="Power (W)") if __name__ == "__main__": - # set model top path to data path - os.environ['MODEL_PATH'] = data_path - # Create an ArgumentParser object parser = argparse.ArgumentParser(description="Kepler model server entrypoint") parser.add_argument("command", type=str, help="The command to execute.") + parser.add_argument("--data-path", type=str, help="Specify datapath.", default=data_path) + # Common arguments parser.add_argument("-i", "--input", type=str, help="Specify input file/folder name.", default="") parser.add_argument("-o", "--output", type=str, help="Specify output file/folder name", default=default_output_filename) @@ -941,9 +821,11 @@ def plot_scenario(args): parser.add_argument("--profile", type=str, help="Specify profile input (required for trainer and profile isolator).") parser.add_argument("--target-hints", type=str, help="Specify dynamic workload container name hints (used by TrainIsolator)") parser.add_argument("--bg-hints", type=str, help="Specify background workload container name hints (used by TrainIsolator)") + parser.add_argument("--abs-pipeline-name", type=str, help="Specify AbsPower model pipeline (used by TrainIsolator)", default="") parser.add_argument("-e", "--energy-source", type=str, help="Specify energy source.", default="rapl") - parser.add_argument("--abs-trainers", type=str, help="Specify trainer names (use comma(,) as delimiter).", default=default_trainers) - parser.add_argument("--dyn-trainers", type=str, help="Specify trainer names (use comma(,) as delimiter).", default=default_trainers) + parser.add_argument("--abs-trainers", type=str, help="Specify trainer names for train command (use comma(,) as delimiter).", default="default") + parser.add_argument("--dyn-trainers", type=str, help="Specify trainer names for train command (use comma(,) as delimiter).", default="default") + parser.add_argument("--trainers", type=str, help="Specify trainer names for train_from_data command (use comma(,) as delimiter).", default="XgboostFitTrainer") # Validate arguments parser.add_argument("--benchmark", type=str, help="Specify benchmark file name.") @@ -966,14 +848,20 @@ def plot_scenario(args): # Parse the command-line arguments args = parser.parse_args() - if not os.path.exists(data_path): - print("{} must be mount, add -v \"$(pwd)\":{} .".format(data_path, data_path)) - exit() + # set model top path to data path + data_path = args.data_path # Check if the required argument is provided if not args.command: parser.print_help() else: + if not os.path.exists(data_path): + if args.command == "query": + os.makedirs(data_path) + print("create new folder for data: {}".format(data_path)) + else: + print("{} must be mount, add -v \"$(pwd)\":{} .".format(data_path, data_path)) + exit() getattr(sys.modules[__name__], args.command)(args) diff --git a/src/train/trainer/__init__.py b/src/train/trainer/__init__.py index 7bf72d5b..48361f55 100644 --- a/src/train/trainer/__init__.py +++ b/src/train/trainer/__init__.py @@ -242,7 +242,9 @@ def save_model_and_metadata(self, node_type, X_test_map, y_test_map): item = self.get_basic_metadata(node_type) for component in self.energy_components: mae = self.get_mae(node_type, component, X_test_map[component], y_test_map[component]) + mae = round(mae, 2) mape = self.get_mape(node_type, component, X_test_map[component], y_test_map[component]) + mape = round(mape, 2) if max_mae is None or mae > max_mae: max_mae = mae if max_mape is None or mape > max_mape: diff --git a/src/train/trainer/scikit.py b/src/train/trainer/scikit.py index d35d33a3..77c1bcbb 100644 --- a/src/train/trainer/scikit.py +++ b/src/train/trainer/scikit.py @@ -60,7 +60,7 @@ def get_mape(self, node_type, component, X_test, y_test): predicted_values = self.predict(node_type, component, X_test, skip_preprocess=True) non_zero_predicted_values = np.array([predicted_values[i] for i in range(len(predicted_values)) if y_test[i] > 0]) if len(non_zero_predicted_values) == 0: - return 0 + return -1 non_zero_y_test = np.array([y for y in y_test if y > 0]) absolute_percentage_errors = np.abs((non_zero_y_test - non_zero_predicted_values) / non_zero_y_test) * 100 mape = np.mean(absolute_percentage_errors) diff --git a/src/train/trainer/xgboost_interface.py b/src/train/trainer/xgboost_interface.py index fb151bc9..e4823104 100644 --- a/src/train/trainer/xgboost_interface.py +++ b/src/train/trainer/xgboost_interface.py @@ -80,7 +80,7 @@ def get_mape(self, node_type, component, X_test, y_test): predicted_values = self.predict(node_type, component, X_test, skip_preprocess=True) non_zero_predicted_values = np.array([predicted_values[i] for i in range(len(predicted_values)) if y_test[i] > 0]) if len(non_zero_predicted_values) == 0: - return 0 + return -1 non_zero_y_test = np.array([y for y in y_test if y > 0]) absolute_percentage_errors = np.abs((non_zero_y_test - non_zero_predicted_values) / non_zero_y_test) * 100 mape = np.mean(absolute_percentage_errors) diff --git a/src/util/extract_types.py b/src/util/extract_types.py index d8ccc902..6affa95d 100644 --- a/src/util/extract_types.py +++ b/src/util/extract_types.py @@ -36,4 +36,9 @@ def get_unit_vals(power_columns): def get_num_of_unit(energy_source, label_cols): energy_components = PowerSourceMap(energy_source) num_of_unit = len(label_cols)/len(energy_components) - return num_of_unit \ No newline at end of file + return num_of_unit + +def get_expected_power_columns(energy_components, num_of_unit=1): + # TODO: if ratio applied, + # return [component_to_col(component, "package", unit_val) for component in energy_components for unit_val in range(0,num_of_unit)] + return [component_to_col(component) for component in energy_components] \ No newline at end of file