From 9bef6961b13db33d1618b1b4eceabe48364fba7e Mon Sep 17 00:00:00 2001 From: Krishnasuri Narayanam Date: Fri, 10 Nov 2023 10:53:10 +0530 Subject: [PATCH 1/4] changes to collect metrics from Prometheus and train model with benchmark run outside kepler-model-server Signed-off-by: Krishnasuri Narayanam --- cmd/main.py | 93 +++++++++++++++++++++++++++++++-- model_training/README.md | 41 +++++++++++++-- model_training/script.sh | 38 ++++++++++---- src/train/exporter/validator.py | 2 +- src/util/prom_types.py | 1 + src/util/saver.py | 2 +- 6 files changed, 157 insertions(+), 20 deletions(-) diff --git a/cmd/main.py b/cmd/main.py index d70ca2e8..1d26ea88 100644 --- a/cmd/main.py +++ b/cmd/main.py @@ -19,7 +19,7 @@ sys.path.append(src_path) from util.prom_types import PROM_SERVER, PROM_QUERY_INTERVAL, PROM_QUERY_STEP, PROM_HEADERS, PROM_SSL_DISABLE, PROM_THIRDPARTY_METRICS -from util.prom_types import metric_prefix as KEPLER_METRIC_PREFIX, node_info_column, prom_responses_to_results, TIMESTAMP_COL, feature_to_query, update_thirdparty_metrics +from util.prom_types import metric_prefix as KEPLER_METRIC_PREFIX, custom_metrics as CUSTOM_METRICS, node_info_column, prom_responses_to_results, TIMESTAMP_COL, feature_to_query, update_thirdparty_metrics from util.train_types import ModelOutputType, FeatureGroup, FeatureGroups, is_single_source_feature_group, SingleSourceFeatures from util.loader import load_json, DEFAULT_PIPELINE, load_pipeline_metadata, get_pipeline_path, get_model_group_path, list_pipelines, list_model_names, load_metadata, load_csv, get_machine_path, get_preprocess_folder, get_general_filename from util.saver import save_json, save_csv, save_train_args @@ -49,6 +49,15 @@ def extract_time(benchmark_filename): print(UTC_OFFSET_TIMEDELTA) return start-UTC_OFFSET_TIMEDELTA, end-UTC_OFFSET_TIMEDELTA +def custom_extract_time(benchmark_filename): + data = load_json(data_path, benchmark_filename) + start_str = data["startTimeUTC"] + start = datetime.datetime.strptime(start_str, '%Y-%m-%dT%H:%M:%SZ') + end_str = data["endTimeUTC"] + end = datetime.datetime.strptime(end_str, '%Y-%m-%dT%H:%M:%SZ') + print(UTC_OFFSET_TIMEDELTA) + return start-UTC_OFFSET_TIMEDELTA, end-UTC_OFFSET_TIMEDELTA + def summary_validation(validate_df): if len(validate_df) == 0: print("No data for validation.") @@ -181,6 +190,30 @@ def get_validate_df(benchmark_filename, query_response): print(validate_df.groupby(["scenarioID", "query"]).sum()[["count", ">0"]]) return validate_df +def get_custom_validate_df(benchmark_filename, query_response, queries): + items = [] + query_results = prom_responses_to_results(query_response) + print("queries: {}".format(queries)) + for query in queries: + df = query_results[query] + if df.empty: + continue + print("query: {}".format(query)) + print("df: {}".format(df)) + # set validate item + item = dict() + item["pod"] = benchmark_filename + item["scenarioID"] = "" + item["query"] = query + item["count"] = len(df) + item[">0"] = len(df[df[query] > 0]) + item["total"] = df[query].max() + items += [item] + validate_df = pd.DataFrame(items) + if not validate_df.empty: + print(validate_df.groupby(["scenarioID", "query"]).sum()[["count", ">0"]]) + return validate_df + def check_ot_fg(args, valid_fg): ot = None fg = None @@ -232,6 +265,56 @@ def query(args): summary_validation(validate_df) save_csv(path=data_path, name=args.output + "_validate_result", data=validate_df) +def custom_query(args): + from prometheus_api_client import PrometheusConnect + prom = PrometheusConnect(url=args.server, headers=PROM_HEADERS, disable_ssl=PROM_SSL_DISABLE) + benchmark_filename = args.input + filepath = os.path.join(data_path, benchmark_filename+".json") + if not os.path.isfile(filepath): + print("Query last {} interval.".format(args.interval)) + end = datetime.datetime.now(datetime.timezone.utc) + start = end - datetime.timedelta(seconds=args.interval) + item = dict() + item["startTimeUTC"] = start.strftime("%Y-%m-%dT%H:%M:%SZ") + item["endTimeUTC"] = end.strftime("%Y-%m-%dT%H:%M:%SZ") + save_json(path=data_path, name=benchmark_filename, data=item) + print("Warning: Re-run after checking if start and end timestamps are as desired in the file {}".format(filepath)) + exit() + else: + print("Query from {}.".format(benchmark_filename)) + start, end = custom_extract_time(benchmark_filename) + available_metrics = prom.all_metrics() + queries = [m for m in available_metrics if args.metric_prefix in m] + custom_metrics_list = args.custom_metrics.split(",") + queries.extend(custom_metrics_list) + + print("Start {} End {}".format(start, end)) + response = _range_queries(prom, queries, start, end, args.step, None) + save_json(path=data_path, name=args.output, data=response) + # perform custom validation + validate_df = get_custom_validate_df(benchmark_filename, response, queries) + summary_validation(validate_df) + save_csv(path=data_path, name=args.output + "_validate_result", data=validate_df) + +def custom_validate(args): + from prometheus_api_client import PrometheusConnect + prom = PrometheusConnect(url=args.server, headers=PROM_HEADERS, disable_ssl=PROM_SSL_DISABLE) + available_metrics = prom.all_metrics() + queries = [m for m in available_metrics if args.metric_prefix in m] + custom_metrics_list = args.custom_metrics.split(",") + queries.extend(custom_metrics_list) + + if not args.benchmark: + print("Need --benchmark") + exit() + + response_filename = args.input + response = load_json(data_path, response_filename) + validate_df = get_custom_validate_df(args.benchmark, response, queries) + summary_validation(validate_df) + if args.output: + save_csv(path=data_path, name=args.output, data=validate_df) + def validate(args): if not args.benchmark: print("Need --benchmark") @@ -655,7 +738,7 @@ def plot(args): energy_sources = args.energy_source.split(",") output_folder = os.path.join(data_path, args.output) if not os.path.exists(output_folder): - os.mkdir(output_folder) + os.makedirs(output_folder, exist_ok=True) if args.target_data == "preprocess": data_saved_path = get_preprocess_folder(pipeline_path) feature_plot = [] @@ -752,7 +835,10 @@ def export(args): pipeline_path = get_pipeline_path(data_path, pipeline_name=pipeline_name) machine_path = get_machine_path(output_path, args.version, machine_id) - collect_date, _ = extract_time(args.benchmark) + if args.benchmark == "customBenchmark": + collect_date, _ = custom_extract_time(args.benchmark) + else: + collect_date, _ = extract_time(args.benchmark) exporter.export(data_path, pipeline_path, machine_path, machine_id=machine_id, version=args.version, publisher=args.publisher, collect_date=collect_date, include_raw=args.include_raw) args.energy_source = ",".join(PowerSourceMap.keys()) @@ -873,6 +959,7 @@ def plot_scenario(args): parser.add_argument("--step", type=str, help="Specify query step.", default=PROM_QUERY_STEP) parser.add_argument("--metric-prefix", type=str, help="Specify metrix prefix to filter.", default=KEPLER_METRIC_PREFIX) parser.add_argument("-tm", "--thirdparty-metrics", nargs='+', help="Specify the thirdparty metrics that not included by Kepler", default="") + parser.add_argument("--custom-metrics", type=str, help="Specify comma seperated custom metrics list.", default=CUSTOM_METRICS) # Train arguments parser.add_argument("-p", "--pipeline-name", type=str, help="Specify pipeline name.") diff --git a/model_training/README.md b/model_training/README.md index 88f6900e..11c856c2 100644 --- a/model_training/README.md +++ b/model_training/README.md @@ -54,7 +54,31 @@ This is only for testing purpose. It might take an hour to run and collect all benchmarks. Output including CPE CR and Prometheus query response will be in `data` folder by default. -## 3. Profile and train +## 3. Collect metrics without running benchmark +Users might want to run a custom benchmark outside of `kepler-model-server` and collect metrics for training the model using `kepler-model-server`. + +### Custom Benchmark + + ./script.sh custom_collect + +When this command is run, it creates a file `customBenchmark.json` with a default start and end timestamps for querying metrics from Prometheus. If the desired time window for metrics collection is different, manually update the timestamps in that file accordinly. Then run again the same command to collect the metrics. + +## 4. Validate collected metrics +Validation of metrics happens by default at the time of their collection. It is also possible to validate the collected metrics explicitly. + +### Quick sample + + ./script.sh validate sample + +### Full run + + ./script.sh validate stressng + +### Custom benchmark + + ./script.sh validate customBenchmark + +## 5. Profile and train You can train the model by using docker image which require no environment setup but can be limited by docker limitation or train the model natively by setting up your python environment as follows. @@ -73,6 +97,12 @@ You can train the model by using docker image which require no environment setup ./script.sh train ``` +#### Custom benchmark + +``` +./script.sh custom_train +``` + Training output will be in `/data` folder by default. The folder contains: - preprocessed data from Prometheus query response - profiles @@ -106,20 +136,21 @@ Run 1. Fork `kepler-model-db`. -1. Validate and make a copy by export command. Need to define `machine id`, `local path to forked kepler-model-db/models`, and `author github account`. +1. Validate and make a copy by export command. Need to define `machine id`, `local path to forked kepler-model-db/models`, `author github account` and `benchmark type`. Run ``` - ./script.sh export + ./script.sh export ``` - If you are agree to also share the raw data (preprocessed data and archived file of full pipeline), run + If you also agree to share the raw data (preprocessed data and archived file of full pipeline), run ``` - ./script.sh export_with_raw + ./script.sh export_with_raw ``` - set `NATIVE="true"` to export natively. + - Benchmark type accepts one of the values `sample`, `stressng` or `customBenchmark`. 2. Add information of your machine in `./models/README.md` in `kepler-model-db`. You may omit any column as needed. 3. Push PR to `kepler-model-db. diff --git a/model_training/script.sh b/model_training/script.sh index 073bc7c3..b07c41e6 100755 --- a/model_training/script.sh +++ b/model_training/script.sh @@ -142,6 +142,12 @@ function collect_data() { kubectl delete -f benchmark/${BENCHMARK}.yaml } +function custom_collect_data() { + BENCHMARK=$1 + ARGS="-i ${BENCHMARK} -o ${BENCHMARK}_kepler_query -s ${PROM_SERVER}" + python ../cmd/main.py custom_query ${ARGS}|| true +} + function deploy_prom_dependency(){ kubectl apply -f deployment/prom-kepler-rbac.yaml kubectl apply -f deployment/prom-np.yaml @@ -176,23 +182,35 @@ function collect() { collect_data stressng cpe-operator-system 60 } +function custom_collect() { + custom_collect_data customBenchmark +} + function quick_collect() { collect_data sample cpe-operator-system 10 } function train() { - train_model stressng_kepler_query ${VERSION} + train_model stressng_kepler_query ${VERSION}_stressng } function quick_train() { train_model sample_kepler_query ${VERSION}_sample } +function custom_train() { + train_model customBenchmark_kepler_query ${VERSION}_customBenchmark +} + function validate() { BENCHMARK=$1 ARGS="-i ${BENCHMARK}_kepler_query --benchmark ${BENCHMARK}" - if [ -z "$NATIVE" ]; then + if [ -z "$NATIVE" -a "${BENCHMARK}" == "customBenchmark" ]; then + docker run --rm -v $CPE_DATAPATH:/data ${ENTRYPOINT_IMG} custom_validate ${ARGS} + elif [ -z "$NATIVE" ]; then docker run --rm -v $CPE_DATAPATH:/data ${ENTRYPOINT_IMG} validate ${ARGS} + elif [ "${BENCHMARK}" == "customBenchmark" ]; then + python ../cmd/main.py custom_validate ${ARGS}|| true else python ../cmd/main.py validate ${ARGS}|| true fi @@ -202,16 +220,16 @@ function _export() { ID=$1 OUTPUT=$2 PUBLISHER=$3 - INCLUDE_RAW=$4 + MAIN_COLLECT_INPUT=$4 + INCLUDE_RAW=$5 - if [ $# -lt 3 ]; then - echo "need arguements: [machine_id] [path_to_models] [publisher]" + if [ $# -lt 4 ]; then + echo "need arguements: [machine_id] [path_to_models] [publisher] [benchmark_type]" exit 2 fi - PIPELINE_NAME=${PIPELINE_PREFIX}${VERSION} - VALIDATE_INPUT="stressng_kepler_query" - MAIN_COLLECT_INPUT="stressng" + PIPELINE_NAME=${PIPELINE_PREFIX}${VERSION}_${MAIN_COLLECT_INPUT} + VALIDATE_INPUT="${MAIN_COLLECT_INPUT}_kepler_query" ARGS="--id ${ID} -p ${PIPELINE_NAME} -i ${VALIDATE_INPUT} --benchmark ${MAIN_COLLECT_INPUT} --version ${VERSION} --publisher ${PUBLISHER} ${INCLUDE_RAW}" echo "${ARGS}" if [ -z "$NATIVE" ]; then @@ -222,11 +240,11 @@ function _export() { } function export() { - _export $1 $2 $3 + _export $1 $2 $3 $4 } function export_with_raw() { - _export $1 $2 $3 "--include-raw true" + _export $1 $2 $3 $4 "--include-raw true" } function cleanup() { diff --git a/src/train/exporter/validator.py b/src/train/exporter/validator.py index 851f952c..b9618bed 100644 --- a/src/train/exporter/validator.py +++ b/src/train/exporter/validator.py @@ -7,7 +7,7 @@ from loader import load_train_args from config import ERROR_KEY -required_benchmark = ["stressng_kepler_query"] +required_benchmark = ["sample_kepler_query", "stressng_kepler_query", "customBenchmark_kepler_query"] default_threshold_percent = 20 diff --git a/src/util/prom_types.py b/src/util/prom_types.py index c51058a7..41d80f37 100644 --- a/src/util/prom_types.py +++ b/src/util/prom_types.py @@ -18,6 +18,7 @@ PROM_THIRDPARTY_METRICS = getConfig('PROM_THIRDPARTY_METRICS', "").split(',') metric_prefix = "kepler_" +custom_metrics = "node_cpu_seconds_total" TIMESTAMP_COL = "timestamp" PACKAGE_COL = "package" SOURCE_COL = "source" diff --git a/src/util/saver.py b/src/util/saver.py index d1030ef2..2f67f95f 100644 --- a/src/util/saver.py +++ b/src/util/saver.py @@ -14,7 +14,7 @@ def assure_path(path): if path == '': return '' if not os.path.exists(path): - os.mkdir(path) + os.makedirs(path, exist_ok=True) return path def save_json(path, name, data): From 91eec3c748e94b74c697a72e6869490b96a29c88 Mon Sep 17 00:00:00 2001 From: Krishnasuri Narayanam Date: Mon, 13 Nov 2023 11:02:07 +0530 Subject: [PATCH 2/4] changes to address the review comments Signed-off-by: Krishnasuri Narayanam --- cmd/main.py | 164 ++++++++++++++++----------------------- model_training/README.md | 2 +- model_training/script.sh | 26 +++---- src/util/prom_types.py | 5 +- 4 files changed, 82 insertions(+), 115 deletions(-) diff --git a/cmd/main.py b/cmd/main.py index 1d26ea88..3ccd8d78 100644 --- a/cmd/main.py +++ b/cmd/main.py @@ -18,8 +18,8 @@ src_path = os.path.join(os.path.dirname(__file__), '..', 'src') sys.path.append(src_path) -from util.prom_types import PROM_SERVER, PROM_QUERY_INTERVAL, PROM_QUERY_STEP, PROM_HEADERS, PROM_SSL_DISABLE, PROM_THIRDPARTY_METRICS -from util.prom_types import metric_prefix as KEPLER_METRIC_PREFIX, custom_metrics as CUSTOM_METRICS, node_info_column, prom_responses_to_results, TIMESTAMP_COL, feature_to_query, update_thirdparty_metrics +from util.prom_types import PROM_SERVER, PROM_QUERY_INTERVAL, PROM_QUERY_STEP, PROM_QUERY_START_TIME, PROM_QUERY_END_TIME, PROM_HEADERS, PROM_SSL_DISABLE, PROM_THIRDPARTY_METRICS +from util.prom_types import metric_prefix as KEPLER_METRIC_PREFIX, node_info_column, prom_responses_to_results, TIMESTAMP_COL, feature_to_query, update_thirdparty_metrics from util.train_types import ModelOutputType, FeatureGroup, FeatureGroups, is_single_source_feature_group, SingleSourceFeatures from util.loader import load_json, DEFAULT_PIPELINE, load_pipeline_metadata, get_pipeline_path, get_model_group_path, list_pipelines, list_model_names, load_metadata, load_csv, get_machine_path, get_preprocess_folder, get_general_filename from util.saver import save_json, save_csv, save_train_args @@ -42,19 +42,16 @@ def print_file_to_stdout(args): def extract_time(benchmark_filename): data = load_json(data_path, benchmark_filename) - start_str = data["metadata"]["creationTimestamp"] - start = datetime.datetime.strptime(start_str, '%Y-%m-%dT%H:%M:%SZ') - end_str = data["status"]["results"][-1]["repetitions"][-1]["pushedTime"].split(".")[0] - end = datetime.datetime.strptime(end_str, '%Y-%m-%d %H:%M:%S') - print(UTC_OFFSET_TIMEDELTA) - return start-UTC_OFFSET_TIMEDELTA, end-UTC_OFFSET_TIMEDELTA - -def custom_extract_time(benchmark_filename): - data = load_json(data_path, benchmark_filename) - start_str = data["startTimeUTC"] - start = datetime.datetime.strptime(start_str, '%Y-%m-%dT%H:%M:%SZ') - end_str = data["endTimeUTC"] - end = datetime.datetime.strptime(end_str, '%Y-%m-%dT%H:%M:%SZ') + if benchmark_filename != "customBenchmark": + start_str = data["metadata"]["creationTimestamp"] + start = datetime.datetime.strptime(start_str, '%Y-%m-%dT%H:%M:%SZ') + end_str = data["status"]["results"][-1]["repetitions"][-1]["pushedTime"].split(".")[0] + end = datetime.datetime.strptime(end_str, '%Y-%m-%d %H:%M:%S') + else: + start_str = data["startTimeUTC"] + start = datetime.datetime.strptime(start_str, '%Y-%m-%dT%H:%M:%SZ') + end_str = data["endTimeUTC"] + end = datetime.datetime.strptime(end_str, '%Y-%m-%dT%H:%M:%SZ') print(UTC_OFFSET_TIMEDELTA) return start-UTC_OFFSET_TIMEDELTA, end-UTC_OFFSET_TIMEDELTA @@ -110,11 +107,11 @@ def summary_validation(validate_df): def get_validate_df(benchmark_filename, query_response): items = [] query_results = prom_responses_to_results(query_response) - queries = [query for query in query_results.keys() if "container" in query] + container_queries = [query for query in query_results.keys() if "container" in query] status_data = load_json(data_path, benchmark_filename) - if status_data is None: + if status_data is None or status_data.get("status", None) == None: # select all with keyword - for query in queries: + for query in container_queries: df = query_results[query] if len(df) == 0: # set validate item // no value @@ -150,7 +147,7 @@ def get_validate_df(benchmark_filename, query_response): repetitions = result["repetitions"] for rep in repetitions: podname = rep["pod"] - for query in queries: + for query in container_queries: df = query_results[query] if len(df) == 0: # set validate item // no value @@ -176,30 +173,40 @@ def get_validate_df(benchmark_filename, query_response): energy_queries = [query for query in query_results.keys() if "_joules" in query] for query in energy_queries: df = query_results[query] - filtered_df = df.copy() + if len(df) == 0: + # set validate item // no value + item = dict() + item["pod"] = "" + item["scenarioID"] = "" + item["query"] = query + item["count"] = 0 + item[">0"] = 0 + item["total"] = 0 + items += [item] + continue # set validate item item = dict() item["pod"] = "" item["scenarioID"] = "" item["query"] = query - item["count"] = len(filtered_df) - item[">0"] = len(filtered_df[filtered_df[query] > 0]) - item["total"] = filtered_df[query].max() + item["count"] = len(df) + item[">0"] = len(df[df[query] > 0]) + item["total"] = df[query].max() items += [item] - validate_df = pd.DataFrame(items) - print(validate_df.groupby(["scenarioID", "query"]).sum()[["count", ">0"]]) - return validate_df - -def get_custom_validate_df(benchmark_filename, query_response, queries): - items = [] - query_results = prom_responses_to_results(query_response) - print("queries: {}".format(queries)) - for query in queries: + other_queries = [query for query in query_results.keys() if query not in container_queries and query not in energy_queries] + for query in other_queries: df = query_results[query] - if df.empty: + if len(df) == 0: + # set validate item // no value + item = dict() + item["pod"] = benchmark_filename + item["scenarioID"] = "" + item["query"] = query + item["count"] = 0 + item[">0"] = 0 + item["total"] = 0 + items += [item] continue - print("query: {}".format(query)) - print("df: {}".format(df)) # set validate item item = dict() item["pod"] = benchmark_filename @@ -240,9 +247,27 @@ def query(args): prom = PrometheusConnect(url=args.server, headers=PROM_HEADERS, disable_ssl=PROM_SSL_DISABLE) benchmark_filename = args.input if benchmark_filename == "": - print("Query last {} interval.".format(args.interval)) - end = datetime.datetime.now() - start = end - datetime.timedelta(seconds=args.interval) + print("need name of the benchmark: sample, stressng, or customBenchmark") + exit() + filepath = os.path.join(data_path, benchmark_filename+".json") + if not os.path.isfile(filepath): + if args.start_time != "" and args.end_time != "": + # by [start time, end time] + print("Query from start_time {} to end_time {}.".format(args.start_time, args.end_time)) + start = datetime.datetime.strptime(args.start_time, '%Y-%m-%dT%H:%M:%SZ') + end = datetime.datetime.strptime(args.end_time , '%Y-%m-%dT%H:%M:%SZ') + else: + # by interval + print("Query last {} interval.".format(args.interval)) + end = datetime.datetime.now(datetime.timezone.utc) + start = end - datetime.timedelta(seconds=args.interval) + # save benchmark + item = dict() + item["startTimeUTC"] = start.strftime("%Y-%m-%dT%H:%M:%SZ") + item["endTimeUTC"] = end.strftime("%Y-%m-%dT%H:%M:%SZ") + save_json(path=data_path, name=benchmark_filename, data=item) + start = datetime.datetime.strptime(item["startTimeUTC"], '%Y-%m-%dT%H:%M:%SZ') - UTC_OFFSET_TIMEDELTA + end = datetime.datetime.strptime(item["endTimeUTC"], '%Y-%m-%dT%H:%M:%SZ') - UTC_OFFSET_TIMEDELTA else: print("Query from {}.".format(benchmark_filename)) start, end = extract_time(benchmark_filename) @@ -260,61 +285,10 @@ def query(args): response = _range_queries(prom, queries, start, end, args.step, None) save_json(path=data_path, name=args.output, data=response) # try validation if applicable - if benchmark_filename != "" and args.metric_prefix == KEPLER_METRIC_PREFIX: - validate_df = get_validate_df(benchmark_filename, response) - summary_validation(validate_df) - save_csv(path=data_path, name=args.output + "_validate_result", data=validate_df) - -def custom_query(args): - from prometheus_api_client import PrometheusConnect - prom = PrometheusConnect(url=args.server, headers=PROM_HEADERS, disable_ssl=PROM_SSL_DISABLE) - benchmark_filename = args.input - filepath = os.path.join(data_path, benchmark_filename+".json") - if not os.path.isfile(filepath): - print("Query last {} interval.".format(args.interval)) - end = datetime.datetime.now(datetime.timezone.utc) - start = end - datetime.timedelta(seconds=args.interval) - item = dict() - item["startTimeUTC"] = start.strftime("%Y-%m-%dT%H:%M:%SZ") - item["endTimeUTC"] = end.strftime("%Y-%m-%dT%H:%M:%SZ") - save_json(path=data_path, name=benchmark_filename, data=item) - print("Warning: Re-run after checking if start and end timestamps are as desired in the file {}".format(filepath)) - exit() - else: - print("Query from {}.".format(benchmark_filename)) - start, end = custom_extract_time(benchmark_filename) - available_metrics = prom.all_metrics() - queries = [m for m in available_metrics if args.metric_prefix in m] - custom_metrics_list = args.custom_metrics.split(",") - queries.extend(custom_metrics_list) - - print("Start {} End {}".format(start, end)) - response = _range_queries(prom, queries, start, end, args.step, None) - save_json(path=data_path, name=args.output, data=response) - # perform custom validation - validate_df = get_custom_validate_df(benchmark_filename, response, queries) + validate_df = get_validate_df(benchmark_filename, response) summary_validation(validate_df) save_csv(path=data_path, name=args.output + "_validate_result", data=validate_df) -def custom_validate(args): - from prometheus_api_client import PrometheusConnect - prom = PrometheusConnect(url=args.server, headers=PROM_HEADERS, disable_ssl=PROM_SSL_DISABLE) - available_metrics = prom.all_metrics() - queries = [m for m in available_metrics if args.metric_prefix in m] - custom_metrics_list = args.custom_metrics.split(",") - queries.extend(custom_metrics_list) - - if not args.benchmark: - print("Need --benchmark") - exit() - - response_filename = args.input - response = load_json(data_path, response_filename) - validate_df = get_custom_validate_df(args.benchmark, response, queries) - summary_validation(validate_df) - if args.output: - save_csv(path=data_path, name=args.output, data=validate_df) - def validate(args): if not args.benchmark: print("Need --benchmark") @@ -835,10 +809,7 @@ def export(args): pipeline_path = get_pipeline_path(data_path, pipeline_name=pipeline_name) machine_path = get_machine_path(output_path, args.version, machine_id) - if args.benchmark == "customBenchmark": - collect_date, _ = custom_extract_time(args.benchmark) - else: - collect_date, _ = extract_time(args.benchmark) + collect_date, _ = extract_time(args.benchmark) exporter.export(data_path, pipeline_path, machine_path, machine_id=machine_id, version=args.version, publisher=args.publisher, collect_date=collect_date, include_raw=args.include_raw) args.energy_source = ",".join(PowerSourceMap.keys()) @@ -956,10 +927,11 @@ def plot_scenario(args): # Query arguments parser.add_argument("-s", "--server", type=str, help="Specify prometheus server.", default=PROM_SERVER) parser.add_argument("--interval", type=int, help="Specify query interval.", default=PROM_QUERY_INTERVAL) + parser.add_argument("--start-time", type=str, help="Specify query start time.", default=PROM_QUERY_START_TIME) + parser.add_argument("--end-time", type=str, help="Specify query end time.", default=PROM_QUERY_END_TIME) parser.add_argument("--step", type=str, help="Specify query step.", default=PROM_QUERY_STEP) parser.add_argument("--metric-prefix", type=str, help="Specify metrix prefix to filter.", default=KEPLER_METRIC_PREFIX) - parser.add_argument("-tm", "--thirdparty-metrics", nargs='+', help="Specify the thirdparty metrics that not included by Kepler", default="") - parser.add_argument("--custom-metrics", type=str, help="Specify comma seperated custom metrics list.", default=CUSTOM_METRICS) + parser.add_argument("-tm", "--thirdparty-metrics", nargs='+', help="Specify the thirdparty metrics that are not included by Kepler", default="") # Train arguments parser.add_argument("-p", "--pipeline-name", type=str, help="Specify pipeline name.") diff --git a/model_training/README.md b/model_training/README.md index 11c856c2..2b1eed37 100644 --- a/model_training/README.md +++ b/model_training/README.md @@ -61,7 +61,7 @@ Users might want to run a custom benchmark outside of `kepler-model-server` and ./script.sh custom_collect -When this command is run, it creates a file `customBenchmark.json` with a default start and end timestamps for querying metrics from Prometheus. If the desired time window for metrics collection is different, manually update the timestamps in that file accordinly. Then run again the same command to collect the metrics. +Use either `interval` or [`start_time`, `end_time`] options to set the desired time window for metrics collection from Prometheus. ## 4. Validate collected metrics Validation of metrics happens by default at the time of their collection. It is also possible to validate the collected metrics explicitly. diff --git a/model_training/script.sh b/model_training/script.sh index b07c41e6..bdbeb293 100755 --- a/model_training/script.sh +++ b/model_training/script.sh @@ -130,22 +130,18 @@ function collect_data() { BENCHMARK=$1 BENCHMARK_NS=$2 SLEEP_TIME=$3 - kubectl apply -f benchmark/${BENCHMARK}.yaml - wait_for_benchmark ${BENCHMARK} ${BENCHMARK_NS} ${SLEEP_TIME} - save_benchmark ${BENCHMARK} ${BENCHMARK_NS} + if [ "$BENCHMARK" != "customBenchmark" ]; then + kubectl apply -f benchmark/${BENCHMARK}.yaml + wait_for_benchmark ${BENCHMARK} ${BENCHMARK_NS} ${SLEEP_TIME} + save_benchmark ${BENCHMARK} ${BENCHMARK_NS} + kubectl delete -f benchmark/${BENCHMARK}.yaml + fi ARGS="-i ${BENCHMARK} -o ${BENCHMARK}_kepler_query -s ${PROM_SERVER}" if [ -z "$NATIVE" ]; then docker run --rm -v $CPE_DATAPATH:/data --network=host ${ENTRYPOINT_IMG} query ${ARGS}|| true else python ../cmd/main.py query ${ARGS}|| true fi - kubectl delete -f benchmark/${BENCHMARK}.yaml -} - -function custom_collect_data() { - BENCHMARK=$1 - ARGS="-i ${BENCHMARK} -o ${BENCHMARK}_kepler_query -s ${PROM_SERVER}" - python ../cmd/main.py custom_query ${ARGS}|| true } function deploy_prom_dependency(){ @@ -183,7 +179,7 @@ function collect() { } function custom_collect() { - custom_collect_data customBenchmark + collect_data customBenchmark } function quick_collect() { @@ -205,12 +201,8 @@ function custom_train() { function validate() { BENCHMARK=$1 ARGS="-i ${BENCHMARK}_kepler_query --benchmark ${BENCHMARK}" - if [ -z "$NATIVE" -a "${BENCHMARK}" == "customBenchmark" ]; then - docker run --rm -v $CPE_DATAPATH:/data ${ENTRYPOINT_IMG} custom_validate ${ARGS} - elif [ -z "$NATIVE" ]; then + if [ -z "$NATIVE" ]; then docker run --rm -v $CPE_DATAPATH:/data ${ENTRYPOINT_IMG} validate ${ARGS} - elif [ "${BENCHMARK}" == "customBenchmark" ]; then - python ../cmd/main.py custom_validate ${ARGS}|| true else python ../cmd/main.py validate ${ARGS}|| true fi @@ -224,7 +216,7 @@ function _export() { INCLUDE_RAW=$5 if [ $# -lt 4 ]; then - echo "need arguements: [machine_id] [path_to_models] [publisher] [benchmark_type]" + echo "need arguements: [machine_id] [path_to_models] [publisher] [benchmark_name]" exit 2 fi diff --git a/src/util/prom_types.py b/src/util/prom_types.py index 41d80f37..86ad1339 100644 --- a/src/util/prom_types.py +++ b/src/util/prom_types.py @@ -7,6 +7,8 @@ PROM_HEADERS = '' PROM_QUERY_INTERVAL = 300 PROM_QUERY_STEP = 3 +PROM_QUERY_START_TIME = '2023-11-13T02:47:57Z' +PROM_QUERY_END_TIME = '' PROM_SERVER = getConfig('PROM_SERVER', PROM_SERVER) PROM_HEADERS = getConfig('PROM_HEADERS', PROM_HEADERS) @@ -14,11 +16,12 @@ PROM_SSL_DISABLE = True if getConfig('PROM_SSL_DISABLE', PROM_SSL_DISABLE).lower() == 'true' else False PROM_QUERY_INTERVAL = getConfig('PROM_QUERY_INTERVAL', PROM_QUERY_INTERVAL) PROM_QUERY_STEP = getConfig('PROM_QUERY_STEP', PROM_QUERY_STEP) +PROM_QUERY_START_TIME = getConfig('PROM_QUERY_START_TIME', PROM_QUERY_START_TIME) +PROM_QUERY_END_TIME = getConfig('PROM_QUERY_END_TIME', PROM_QUERY_END_TIME) PROM_THIRDPARTY_METRICS = getConfig('PROM_THIRDPARTY_METRICS', "").split(',') metric_prefix = "kepler_" -custom_metrics = "node_cpu_seconds_total" TIMESTAMP_COL = "timestamp" PACKAGE_COL = "package" SOURCE_COL = "source" From 4132bace8dcebfcd307a9ab8559cce90eef12c7c Mon Sep 17 00:00:00 2001 From: Krishnasuri Narayanam Date: Mon, 13 Nov 2023 12:01:09 +0530 Subject: [PATCH 3/4] setting start time to empty Signed-off-by: Krishnasuri Narayanam --- src/util/prom_types.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/util/prom_types.py b/src/util/prom_types.py index 86ad1339..aa650c33 100644 --- a/src/util/prom_types.py +++ b/src/util/prom_types.py @@ -7,7 +7,7 @@ PROM_HEADERS = '' PROM_QUERY_INTERVAL = 300 PROM_QUERY_STEP = 3 -PROM_QUERY_START_TIME = '2023-11-13T02:47:57Z' +PROM_QUERY_START_TIME = '' PROM_QUERY_END_TIME = '' PROM_SERVER = getConfig('PROM_SERVER', PROM_SERVER) From 44983ff3d7e1890f3c510b2bd7a1a7e962f6a3b8 Mon Sep 17 00:00:00 2001 From: Krishnasuri Narayanam Date: Mon, 13 Nov 2023 12:25:49 +0530 Subject: [PATCH 4/4] changes to relax the assumption that stressng should be included in the benchmark suite Signed-off-by: Krishnasuri Narayanam --- src/train/exporter/exporter.py | 5 +---- src/train/exporter/validator.py | 11 ----------- 2 files changed, 1 insertion(+), 15 deletions(-) diff --git a/src/train/exporter/exporter.py b/src/train/exporter/exporter.py index da58fd75..0d2d37d9 100644 --- a/src/train/exporter/exporter.py +++ b/src/train/exporter/exporter.py @@ -11,7 +11,7 @@ cur_path = os.path.join(os.path.dirname(__file__)) sys.path.append(cur_path) -from validator import validate_arguments, find_acceptable_mae +from validator import find_acceptable_mae from train_types import ModelOutputType, PowerSourceMap, FeatureGroup from loader import load_csv, load_pipeline_metadata, get_model_group_path, load_metadata, load_train_args, get_preprocess_folder, get_general_filename from saver import WEIGHT_FILENAME, save_pipeline_metadata, save_train_args @@ -19,9 +19,6 @@ from writer import generate_pipeline_page, generate_validation_results, append_version_readme def export(data_path, pipeline_path, machine_path, machine_id, version, publisher, collect_date, include_raw=False): - if not validate_arguments(pipeline_path): - return - pipeline_metadata = load_metadata(pipeline_path) if pipeline_metadata is None: print("no pipeline metadata") diff --git a/src/train/exporter/validator.py b/src/train/exporter/validator.py index b9618bed..d8eb116a 100644 --- a/src/train/exporter/validator.py +++ b/src/train/exporter/validator.py @@ -7,18 +7,7 @@ from loader import load_train_args from config import ERROR_KEY -required_benchmark = ["sample_kepler_query", "stressng_kepler_query", "customBenchmark_kepler_query"] - default_threshold_percent = 20 - -def validate_arguments(pipeline_path): - train_args = load_train_args(pipeline_path) - inputs = train_args["input"].split(",") - missing_inputs = [input for input in required_benchmark if input not in inputs] - if len(missing_inputs) > 0: - print("missing required training inputs: ", missing_inputs) - return False - return True def find_acceptable_mae(preprocess_data, metadata): power_cols = [col for col in preprocess_data.columns if "power" in col]