diff --git a/cmd/main.py b/cmd/main.py index d70ca2e8..3ccd8d78 100644 --- a/cmd/main.py +++ b/cmd/main.py @@ -18,7 +18,7 @@ src_path = os.path.join(os.path.dirname(__file__), '..', 'src') sys.path.append(src_path) -from util.prom_types import PROM_SERVER, PROM_QUERY_INTERVAL, PROM_QUERY_STEP, PROM_HEADERS, PROM_SSL_DISABLE, PROM_THIRDPARTY_METRICS +from util.prom_types import PROM_SERVER, PROM_QUERY_INTERVAL, PROM_QUERY_STEP, PROM_QUERY_START_TIME, PROM_QUERY_END_TIME, PROM_HEADERS, PROM_SSL_DISABLE, PROM_THIRDPARTY_METRICS from util.prom_types import metric_prefix as KEPLER_METRIC_PREFIX, node_info_column, prom_responses_to_results, TIMESTAMP_COL, feature_to_query, update_thirdparty_metrics from util.train_types import ModelOutputType, FeatureGroup, FeatureGroups, is_single_source_feature_group, SingleSourceFeatures from util.loader import load_json, DEFAULT_PIPELINE, load_pipeline_metadata, get_pipeline_path, get_model_group_path, list_pipelines, list_model_names, load_metadata, load_csv, get_machine_path, get_preprocess_folder, get_general_filename @@ -42,10 +42,16 @@ def print_file_to_stdout(args): def extract_time(benchmark_filename): data = load_json(data_path, benchmark_filename) - start_str = data["metadata"]["creationTimestamp"] - start = datetime.datetime.strptime(start_str, '%Y-%m-%dT%H:%M:%SZ') - end_str = data["status"]["results"][-1]["repetitions"][-1]["pushedTime"].split(".")[0] - end = datetime.datetime.strptime(end_str, '%Y-%m-%d %H:%M:%S') + if benchmark_filename != "customBenchmark": + start_str = data["metadata"]["creationTimestamp"] + start = datetime.datetime.strptime(start_str, '%Y-%m-%dT%H:%M:%SZ') + end_str = data["status"]["results"][-1]["repetitions"][-1]["pushedTime"].split(".")[0] + end = datetime.datetime.strptime(end_str, '%Y-%m-%d %H:%M:%S') + else: + start_str = data["startTimeUTC"] + start = datetime.datetime.strptime(start_str, '%Y-%m-%dT%H:%M:%SZ') + end_str = data["endTimeUTC"] + end = datetime.datetime.strptime(end_str, '%Y-%m-%dT%H:%M:%SZ') print(UTC_OFFSET_TIMEDELTA) return start-UTC_OFFSET_TIMEDELTA, end-UTC_OFFSET_TIMEDELTA @@ -101,11 +107,11 @@ def summary_validation(validate_df): def get_validate_df(benchmark_filename, query_response): items = [] query_results = prom_responses_to_results(query_response) - queries = [query for query in query_results.keys() if "container" in query] + container_queries = [query for query in query_results.keys() if "container" in query] status_data = load_json(data_path, benchmark_filename) - if status_data is None: + if status_data is None or status_data.get("status", None) == None: # select all with keyword - for query in queries: + for query in container_queries: df = query_results[query] if len(df) == 0: # set validate item // no value @@ -141,7 +147,7 @@ def get_validate_df(benchmark_filename, query_response): repetitions = result["repetitions"] for rep in repetitions: podname = rep["pod"] - for query in queries: + for query in container_queries: df = query_results[query] if len(df) == 0: # set validate item // no value @@ -167,18 +173,52 @@ def get_validate_df(benchmark_filename, query_response): energy_queries = [query for query in query_results.keys() if "_joules" in query] for query in energy_queries: df = query_results[query] - filtered_df = df.copy() + if len(df) == 0: + # set validate item // no value + item = dict() + item["pod"] = "" + item["scenarioID"] = "" + item["query"] = query + item["count"] = 0 + item[">0"] = 0 + item["total"] = 0 + items += [item] + continue # set validate item item = dict() item["pod"] = "" item["scenarioID"] = "" item["query"] = query - item["count"] = len(filtered_df) - item[">0"] = len(filtered_df[filtered_df[query] > 0]) - item["total"] = filtered_df[query].max() + item["count"] = len(df) + item[">0"] = len(df[df[query] > 0]) + item["total"] = df[query].max() + items += [item] + other_queries = [query for query in query_results.keys() if query not in container_queries and query not in energy_queries] + for query in other_queries: + df = query_results[query] + if len(df) == 0: + # set validate item // no value + item = dict() + item["pod"] = benchmark_filename + item["scenarioID"] = "" + item["query"] = query + item["count"] = 0 + item[">0"] = 0 + item["total"] = 0 + items += [item] + continue + # set validate item + item = dict() + item["pod"] = benchmark_filename + item["scenarioID"] = "" + item["query"] = query + item["count"] = len(df) + item[">0"] = len(df[df[query] > 0]) + item["total"] = df[query].max() items += [item] validate_df = pd.DataFrame(items) - print(validate_df.groupby(["scenarioID", "query"]).sum()[["count", ">0"]]) + if not validate_df.empty: + print(validate_df.groupby(["scenarioID", "query"]).sum()[["count", ">0"]]) return validate_df def check_ot_fg(args, valid_fg): @@ -207,9 +247,27 @@ def query(args): prom = PrometheusConnect(url=args.server, headers=PROM_HEADERS, disable_ssl=PROM_SSL_DISABLE) benchmark_filename = args.input if benchmark_filename == "": - print("Query last {} interval.".format(args.interval)) - end = datetime.datetime.now() - start = end - datetime.timedelta(seconds=args.interval) + print("need name of the benchmark: sample, stressng, or customBenchmark") + exit() + filepath = os.path.join(data_path, benchmark_filename+".json") + if not os.path.isfile(filepath): + if args.start_time != "" and args.end_time != "": + # by [start time, end time] + print("Query from start_time {} to end_time {}.".format(args.start_time, args.end_time)) + start = datetime.datetime.strptime(args.start_time, '%Y-%m-%dT%H:%M:%SZ') + end = datetime.datetime.strptime(args.end_time , '%Y-%m-%dT%H:%M:%SZ') + else: + # by interval + print("Query last {} interval.".format(args.interval)) + end = datetime.datetime.now(datetime.timezone.utc) + start = end - datetime.timedelta(seconds=args.interval) + # save benchmark + item = dict() + item["startTimeUTC"] = start.strftime("%Y-%m-%dT%H:%M:%SZ") + item["endTimeUTC"] = end.strftime("%Y-%m-%dT%H:%M:%SZ") + save_json(path=data_path, name=benchmark_filename, data=item) + start = datetime.datetime.strptime(item["startTimeUTC"], '%Y-%m-%dT%H:%M:%SZ') - UTC_OFFSET_TIMEDELTA + end = datetime.datetime.strptime(item["endTimeUTC"], '%Y-%m-%dT%H:%M:%SZ') - UTC_OFFSET_TIMEDELTA else: print("Query from {}.".format(benchmark_filename)) start, end = extract_time(benchmark_filename) @@ -227,10 +285,9 @@ def query(args): response = _range_queries(prom, queries, start, end, args.step, None) save_json(path=data_path, name=args.output, data=response) # try validation if applicable - if benchmark_filename != "" and args.metric_prefix == KEPLER_METRIC_PREFIX: - validate_df = get_validate_df(benchmark_filename, response) - summary_validation(validate_df) - save_csv(path=data_path, name=args.output + "_validate_result", data=validate_df) + validate_df = get_validate_df(benchmark_filename, response) + summary_validation(validate_df) + save_csv(path=data_path, name=args.output + "_validate_result", data=validate_df) def validate(args): if not args.benchmark: @@ -655,7 +712,7 @@ def plot(args): energy_sources = args.energy_source.split(",") output_folder = os.path.join(data_path, args.output) if not os.path.exists(output_folder): - os.mkdir(output_folder) + os.makedirs(output_folder, exist_ok=True) if args.target_data == "preprocess": data_saved_path = get_preprocess_folder(pipeline_path) feature_plot = [] @@ -870,9 +927,11 @@ def plot_scenario(args): # Query arguments parser.add_argument("-s", "--server", type=str, help="Specify prometheus server.", default=PROM_SERVER) parser.add_argument("--interval", type=int, help="Specify query interval.", default=PROM_QUERY_INTERVAL) + parser.add_argument("--start-time", type=str, help="Specify query start time.", default=PROM_QUERY_START_TIME) + parser.add_argument("--end-time", type=str, help="Specify query end time.", default=PROM_QUERY_END_TIME) parser.add_argument("--step", type=str, help="Specify query step.", default=PROM_QUERY_STEP) parser.add_argument("--metric-prefix", type=str, help="Specify metrix prefix to filter.", default=KEPLER_METRIC_PREFIX) - parser.add_argument("-tm", "--thirdparty-metrics", nargs='+', help="Specify the thirdparty metrics that not included by Kepler", default="") + parser.add_argument("-tm", "--thirdparty-metrics", nargs='+', help="Specify the thirdparty metrics that are not included by Kepler", default="") # Train arguments parser.add_argument("-p", "--pipeline-name", type=str, help="Specify pipeline name.") diff --git a/model_training/README.md b/model_training/README.md index 88f6900e..2b1eed37 100644 --- a/model_training/README.md +++ b/model_training/README.md @@ -54,7 +54,31 @@ This is only for testing purpose. It might take an hour to run and collect all benchmarks. Output including CPE CR and Prometheus query response will be in `data` folder by default. -## 3. Profile and train +## 3. Collect metrics without running benchmark +Users might want to run a custom benchmark outside of `kepler-model-server` and collect metrics for training the model using `kepler-model-server`. + +### Custom Benchmark + + ./script.sh custom_collect + +Use either `interval` or [`start_time`, `end_time`] options to set the desired time window for metrics collection from Prometheus. + +## 4. Validate collected metrics +Validation of metrics happens by default at the time of their collection. It is also possible to validate the collected metrics explicitly. + +### Quick sample + + ./script.sh validate sample + +### Full run + + ./script.sh validate stressng + +### Custom benchmark + + ./script.sh validate customBenchmark + +## 5. Profile and train You can train the model by using docker image which require no environment setup but can be limited by docker limitation or train the model natively by setting up your python environment as follows. @@ -73,6 +97,12 @@ You can train the model by using docker image which require no environment setup ./script.sh train ``` +#### Custom benchmark + +``` +./script.sh custom_train +``` + Training output will be in `/data` folder by default. The folder contains: - preprocessed data from Prometheus query response - profiles @@ -106,20 +136,21 @@ Run 1. Fork `kepler-model-db`. -1. Validate and make a copy by export command. Need to define `machine id`, `local path to forked kepler-model-db/models`, and `author github account`. +1. Validate and make a copy by export command. Need to define `machine id`, `local path to forked kepler-model-db/models`, `author github account` and `benchmark type`. Run ``` - ./script.sh export + ./script.sh export ``` - If you are agree to also share the raw data (preprocessed data and archived file of full pipeline), run + If you also agree to share the raw data (preprocessed data and archived file of full pipeline), run ``` - ./script.sh export_with_raw + ./script.sh export_with_raw ``` - set `NATIVE="true"` to export natively. + - Benchmark type accepts one of the values `sample`, `stressng` or `customBenchmark`. 2. Add information of your machine in `./models/README.md` in `kepler-model-db`. You may omit any column as needed. 3. Push PR to `kepler-model-db. diff --git a/model_training/script.sh b/model_training/script.sh index 073bc7c3..bdbeb293 100755 --- a/model_training/script.sh +++ b/model_training/script.sh @@ -130,16 +130,18 @@ function collect_data() { BENCHMARK=$1 BENCHMARK_NS=$2 SLEEP_TIME=$3 - kubectl apply -f benchmark/${BENCHMARK}.yaml - wait_for_benchmark ${BENCHMARK} ${BENCHMARK_NS} ${SLEEP_TIME} - save_benchmark ${BENCHMARK} ${BENCHMARK_NS} + if [ "$BENCHMARK" != "customBenchmark" ]; then + kubectl apply -f benchmark/${BENCHMARK}.yaml + wait_for_benchmark ${BENCHMARK} ${BENCHMARK_NS} ${SLEEP_TIME} + save_benchmark ${BENCHMARK} ${BENCHMARK_NS} + kubectl delete -f benchmark/${BENCHMARK}.yaml + fi ARGS="-i ${BENCHMARK} -o ${BENCHMARK}_kepler_query -s ${PROM_SERVER}" if [ -z "$NATIVE" ]; then docker run --rm -v $CPE_DATAPATH:/data --network=host ${ENTRYPOINT_IMG} query ${ARGS}|| true else python ../cmd/main.py query ${ARGS}|| true fi - kubectl delete -f benchmark/${BENCHMARK}.yaml } function deploy_prom_dependency(){ @@ -176,18 +178,26 @@ function collect() { collect_data stressng cpe-operator-system 60 } +function custom_collect() { + collect_data customBenchmark +} + function quick_collect() { collect_data sample cpe-operator-system 10 } function train() { - train_model stressng_kepler_query ${VERSION} + train_model stressng_kepler_query ${VERSION}_stressng } function quick_train() { train_model sample_kepler_query ${VERSION}_sample } +function custom_train() { + train_model customBenchmark_kepler_query ${VERSION}_customBenchmark +} + function validate() { BENCHMARK=$1 ARGS="-i ${BENCHMARK}_kepler_query --benchmark ${BENCHMARK}" @@ -202,16 +212,16 @@ function _export() { ID=$1 OUTPUT=$2 PUBLISHER=$3 - INCLUDE_RAW=$4 + MAIN_COLLECT_INPUT=$4 + INCLUDE_RAW=$5 - if [ $# -lt 3 ]; then - echo "need arguements: [machine_id] [path_to_models] [publisher]" + if [ $# -lt 4 ]; then + echo "need arguements: [machine_id] [path_to_models] [publisher] [benchmark_name]" exit 2 fi - PIPELINE_NAME=${PIPELINE_PREFIX}${VERSION} - VALIDATE_INPUT="stressng_kepler_query" - MAIN_COLLECT_INPUT="stressng" + PIPELINE_NAME=${PIPELINE_PREFIX}${VERSION}_${MAIN_COLLECT_INPUT} + VALIDATE_INPUT="${MAIN_COLLECT_INPUT}_kepler_query" ARGS="--id ${ID} -p ${PIPELINE_NAME} -i ${VALIDATE_INPUT} --benchmark ${MAIN_COLLECT_INPUT} --version ${VERSION} --publisher ${PUBLISHER} ${INCLUDE_RAW}" echo "${ARGS}" if [ -z "$NATIVE" ]; then @@ -222,11 +232,11 @@ function _export() { } function export() { - _export $1 $2 $3 + _export $1 $2 $3 $4 } function export_with_raw() { - _export $1 $2 $3 "--include-raw true" + _export $1 $2 $3 $4 "--include-raw true" } function cleanup() { diff --git a/src/train/exporter/exporter.py b/src/train/exporter/exporter.py index da58fd75..0d2d37d9 100644 --- a/src/train/exporter/exporter.py +++ b/src/train/exporter/exporter.py @@ -11,7 +11,7 @@ cur_path = os.path.join(os.path.dirname(__file__)) sys.path.append(cur_path) -from validator import validate_arguments, find_acceptable_mae +from validator import find_acceptable_mae from train_types import ModelOutputType, PowerSourceMap, FeatureGroup from loader import load_csv, load_pipeline_metadata, get_model_group_path, load_metadata, load_train_args, get_preprocess_folder, get_general_filename from saver import WEIGHT_FILENAME, save_pipeline_metadata, save_train_args @@ -19,9 +19,6 @@ from writer import generate_pipeline_page, generate_validation_results, append_version_readme def export(data_path, pipeline_path, machine_path, machine_id, version, publisher, collect_date, include_raw=False): - if not validate_arguments(pipeline_path): - return - pipeline_metadata = load_metadata(pipeline_path) if pipeline_metadata is None: print("no pipeline metadata") diff --git a/src/train/exporter/validator.py b/src/train/exporter/validator.py index 851f952c..d8eb116a 100644 --- a/src/train/exporter/validator.py +++ b/src/train/exporter/validator.py @@ -7,18 +7,7 @@ from loader import load_train_args from config import ERROR_KEY -required_benchmark = ["stressng_kepler_query"] - default_threshold_percent = 20 - -def validate_arguments(pipeline_path): - train_args = load_train_args(pipeline_path) - inputs = train_args["input"].split(",") - missing_inputs = [input for input in required_benchmark if input not in inputs] - if len(missing_inputs) > 0: - print("missing required training inputs: ", missing_inputs) - return False - return True def find_acceptable_mae(preprocess_data, metadata): power_cols = [col for col in preprocess_data.columns if "power" in col] diff --git a/src/util/prom_types.py b/src/util/prom_types.py index c51058a7..aa650c33 100644 --- a/src/util/prom_types.py +++ b/src/util/prom_types.py @@ -7,6 +7,8 @@ PROM_HEADERS = '' PROM_QUERY_INTERVAL = 300 PROM_QUERY_STEP = 3 +PROM_QUERY_START_TIME = '' +PROM_QUERY_END_TIME = '' PROM_SERVER = getConfig('PROM_SERVER', PROM_SERVER) PROM_HEADERS = getConfig('PROM_HEADERS', PROM_HEADERS) @@ -14,6 +16,8 @@ PROM_SSL_DISABLE = True if getConfig('PROM_SSL_DISABLE', PROM_SSL_DISABLE).lower() == 'true' else False PROM_QUERY_INTERVAL = getConfig('PROM_QUERY_INTERVAL', PROM_QUERY_INTERVAL) PROM_QUERY_STEP = getConfig('PROM_QUERY_STEP', PROM_QUERY_STEP) +PROM_QUERY_START_TIME = getConfig('PROM_QUERY_START_TIME', PROM_QUERY_START_TIME) +PROM_QUERY_END_TIME = getConfig('PROM_QUERY_END_TIME', PROM_QUERY_END_TIME) PROM_THIRDPARTY_METRICS = getConfig('PROM_THIRDPARTY_METRICS', "").split(',') diff --git a/src/util/saver.py b/src/util/saver.py index d1030ef2..2f67f95f 100644 --- a/src/util/saver.py +++ b/src/util/saver.py @@ -14,7 +14,7 @@ def assure_path(path): if path == '': return '' if not os.path.exists(path): - os.mkdir(path) + os.makedirs(path, exist_ok=True) return path def save_json(path, name, data):