Skip to content

Commit

Permalink
Merge pull request #191 from knarayan/main
Browse files Browse the repository at this point in the history
changes to collect metrics from Prometheus with benchmark run outside of kepler-model-server
  • Loading branch information
sunya-ch authored Nov 13, 2023
2 parents c02be1c + 44983ff commit b1ee3f0
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 57 deletions.
105 changes: 82 additions & 23 deletions cmd/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
src_path = os.path.join(os.path.dirname(__file__), '..', 'src')
sys.path.append(src_path)

from util.prom_types import PROM_SERVER, PROM_QUERY_INTERVAL, PROM_QUERY_STEP, PROM_HEADERS, PROM_SSL_DISABLE, PROM_THIRDPARTY_METRICS
from util.prom_types import PROM_SERVER, PROM_QUERY_INTERVAL, PROM_QUERY_STEP, PROM_QUERY_START_TIME, PROM_QUERY_END_TIME, PROM_HEADERS, PROM_SSL_DISABLE, PROM_THIRDPARTY_METRICS
from util.prom_types import metric_prefix as KEPLER_METRIC_PREFIX, node_info_column, prom_responses_to_results, TIMESTAMP_COL, feature_to_query, update_thirdparty_metrics
from util.train_types import ModelOutputType, FeatureGroup, FeatureGroups, is_single_source_feature_group, SingleSourceFeatures
from util.loader import load_json, DEFAULT_PIPELINE, load_pipeline_metadata, get_pipeline_path, get_model_group_path, list_pipelines, list_model_names, load_metadata, load_csv, get_machine_path, get_preprocess_folder, get_general_filename
Expand All @@ -42,10 +42,16 @@ def print_file_to_stdout(args):

def extract_time(benchmark_filename):
data = load_json(data_path, benchmark_filename)
start_str = data["metadata"]["creationTimestamp"]
start = datetime.datetime.strptime(start_str, '%Y-%m-%dT%H:%M:%SZ')
end_str = data["status"]["results"][-1]["repetitions"][-1]["pushedTime"].split(".")[0]
end = datetime.datetime.strptime(end_str, '%Y-%m-%d %H:%M:%S')
if benchmark_filename != "customBenchmark":
start_str = data["metadata"]["creationTimestamp"]
start = datetime.datetime.strptime(start_str, '%Y-%m-%dT%H:%M:%SZ')
end_str = data["status"]["results"][-1]["repetitions"][-1]["pushedTime"].split(".")[0]
end = datetime.datetime.strptime(end_str, '%Y-%m-%d %H:%M:%S')
else:
start_str = data["startTimeUTC"]
start = datetime.datetime.strptime(start_str, '%Y-%m-%dT%H:%M:%SZ')
end_str = data["endTimeUTC"]
end = datetime.datetime.strptime(end_str, '%Y-%m-%dT%H:%M:%SZ')
print(UTC_OFFSET_TIMEDELTA)
return start-UTC_OFFSET_TIMEDELTA, end-UTC_OFFSET_TIMEDELTA

Expand Down Expand Up @@ -101,11 +107,11 @@ def summary_validation(validate_df):
def get_validate_df(benchmark_filename, query_response):
items = []
query_results = prom_responses_to_results(query_response)
queries = [query for query in query_results.keys() if "container" in query]
container_queries = [query for query in query_results.keys() if "container" in query]
status_data = load_json(data_path, benchmark_filename)
if status_data is None:
if status_data is None or status_data.get("status", None) == None:
# select all with keyword
for query in queries:
for query in container_queries:
df = query_results[query]
if len(df) == 0:
# set validate item // no value
Expand Down Expand Up @@ -141,7 +147,7 @@ def get_validate_df(benchmark_filename, query_response):
repetitions = result["repetitions"]
for rep in repetitions:
podname = rep["pod"]
for query in queries:
for query in container_queries:
df = query_results[query]
if len(df) == 0:
# set validate item // no value
Expand All @@ -167,18 +173,52 @@ def get_validate_df(benchmark_filename, query_response):
energy_queries = [query for query in query_results.keys() if "_joules" in query]
for query in energy_queries:
df = query_results[query]
filtered_df = df.copy()
if len(df) == 0:
# set validate item // no value
item = dict()
item["pod"] = ""
item["scenarioID"] = ""
item["query"] = query
item["count"] = 0
item[">0"] = 0
item["total"] = 0
items += [item]
continue
# set validate item
item = dict()
item["pod"] = ""
item["scenarioID"] = ""
item["query"] = query
item["count"] = len(filtered_df)
item[">0"] = len(filtered_df[filtered_df[query] > 0])
item["total"] = filtered_df[query].max()
item["count"] = len(df)
item[">0"] = len(df[df[query] > 0])
item["total"] = df[query].max()
items += [item]
other_queries = [query for query in query_results.keys() if query not in container_queries and query not in energy_queries]
for query in other_queries:
df = query_results[query]
if len(df) == 0:
# set validate item // no value
item = dict()
item["pod"] = benchmark_filename
item["scenarioID"] = ""
item["query"] = query
item["count"] = 0
item[">0"] = 0
item["total"] = 0
items += [item]
continue
# set validate item
item = dict()
item["pod"] = benchmark_filename
item["scenarioID"] = ""
item["query"] = query
item["count"] = len(df)
item[">0"] = len(df[df[query] > 0])
item["total"] = df[query].max()
items += [item]
validate_df = pd.DataFrame(items)
print(validate_df.groupby(["scenarioID", "query"]).sum()[["count", ">0"]])
if not validate_df.empty:
print(validate_df.groupby(["scenarioID", "query"]).sum()[["count", ">0"]])
return validate_df

def check_ot_fg(args, valid_fg):
Expand Down Expand Up @@ -207,9 +247,27 @@ def query(args):
prom = PrometheusConnect(url=args.server, headers=PROM_HEADERS, disable_ssl=PROM_SSL_DISABLE)
benchmark_filename = args.input
if benchmark_filename == "":
print("Query last {} interval.".format(args.interval))
end = datetime.datetime.now()
start = end - datetime.timedelta(seconds=args.interval)
print("need name of the benchmark: sample, stressng, or customBenchmark")
exit()
filepath = os.path.join(data_path, benchmark_filename+".json")
if not os.path.isfile(filepath):
if args.start_time != "" and args.end_time != "":
# by [start time, end time]
print("Query from start_time {} to end_time {}.".format(args.start_time, args.end_time))
start = datetime.datetime.strptime(args.start_time, '%Y-%m-%dT%H:%M:%SZ')
end = datetime.datetime.strptime(args.end_time , '%Y-%m-%dT%H:%M:%SZ')
else:
# by interval
print("Query last {} interval.".format(args.interval))
end = datetime.datetime.now(datetime.timezone.utc)
start = end - datetime.timedelta(seconds=args.interval)
# save benchmark
item = dict()
item["startTimeUTC"] = start.strftime("%Y-%m-%dT%H:%M:%SZ")
item["endTimeUTC"] = end.strftime("%Y-%m-%dT%H:%M:%SZ")
save_json(path=data_path, name=benchmark_filename, data=item)
start = datetime.datetime.strptime(item["startTimeUTC"], '%Y-%m-%dT%H:%M:%SZ') - UTC_OFFSET_TIMEDELTA
end = datetime.datetime.strptime(item["endTimeUTC"], '%Y-%m-%dT%H:%M:%SZ') - UTC_OFFSET_TIMEDELTA
else:
print("Query from {}.".format(benchmark_filename))
start, end = extract_time(benchmark_filename)
Expand All @@ -227,10 +285,9 @@ def query(args):
response = _range_queries(prom, queries, start, end, args.step, None)
save_json(path=data_path, name=args.output, data=response)
# try validation if applicable
if benchmark_filename != "" and args.metric_prefix == KEPLER_METRIC_PREFIX:
validate_df = get_validate_df(benchmark_filename, response)
summary_validation(validate_df)
save_csv(path=data_path, name=args.output + "_validate_result", data=validate_df)
validate_df = get_validate_df(benchmark_filename, response)
summary_validation(validate_df)
save_csv(path=data_path, name=args.output + "_validate_result", data=validate_df)

def validate(args):
if not args.benchmark:
Expand Down Expand Up @@ -655,7 +712,7 @@ def plot(args):
energy_sources = args.energy_source.split(",")
output_folder = os.path.join(data_path, args.output)
if not os.path.exists(output_folder):
os.mkdir(output_folder)
os.makedirs(output_folder, exist_ok=True)
if args.target_data == "preprocess":
data_saved_path = get_preprocess_folder(pipeline_path)
feature_plot = []
Expand Down Expand Up @@ -870,9 +927,11 @@ def plot_scenario(args):
# Query arguments
parser.add_argument("-s", "--server", type=str, help="Specify prometheus server.", default=PROM_SERVER)
parser.add_argument("--interval", type=int, help="Specify query interval.", default=PROM_QUERY_INTERVAL)
parser.add_argument("--start-time", type=str, help="Specify query start time.", default=PROM_QUERY_START_TIME)
parser.add_argument("--end-time", type=str, help="Specify query end time.", default=PROM_QUERY_END_TIME)
parser.add_argument("--step", type=str, help="Specify query step.", default=PROM_QUERY_STEP)
parser.add_argument("--metric-prefix", type=str, help="Specify metrix prefix to filter.", default=KEPLER_METRIC_PREFIX)
parser.add_argument("-tm", "--thirdparty-metrics", nargs='+', help="Specify the thirdparty metrics that not included by Kepler", default="")
parser.add_argument("-tm", "--thirdparty-metrics", nargs='+', help="Specify the thirdparty metrics that are not included by Kepler", default="")

# Train arguments
parser.add_argument("-p", "--pipeline-name", type=str, help="Specify pipeline name.")
Expand Down
41 changes: 36 additions & 5 deletions model_training/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,31 @@ This is only for testing purpose.
It might take an hour to run and collect all benchmarks. Output including CPE CR and Prometheus query response will be in `data` folder by default.
## 3. Profile and train
## 3. Collect metrics without running benchmark
Users might want to run a custom benchmark outside of `kepler-model-server` and collect metrics for training the model using `kepler-model-server`.
### Custom Benchmark
./script.sh custom_collect
Use either `interval` or [`start_time`, `end_time`] options to set the desired time window for metrics collection from Prometheus.
## 4. Validate collected metrics
Validation of metrics happens by default at the time of their collection. It is also possible to validate the collected metrics explicitly.
### Quick sample
./script.sh validate sample
### Full run
./script.sh validate stressng
### Custom benchmark
./script.sh validate customBenchmark
## 5. Profile and train
You can train the model by using docker image which require no environment setup but can be limited by docker limitation or train the model natively by setting up your python environment as follows.
Expand All @@ -73,6 +97,12 @@ You can train the model by using docker image which require no environment setup
./script.sh train
```
#### Custom benchmark
```
./script.sh custom_train
```
Training output will be in `/data` folder by default. The folder contains:
- preprocessed data from Prometheus query response
- profiles
Expand Down Expand Up @@ -106,20 +136,21 @@ Run
1. Fork `kepler-model-db`.
1. Validate and make a copy by export command. Need to define `machine id`, `local path to forked kepler-model-db/models`, and `author github account`.
1. Validate and make a copy by export command. Need to define `machine id`, `local path to forked kepler-model-db/models`, `author github account` and `benchmark type`.
Run
```
./script.sh export <machine id> <path to kepler-model-db/models> <author github account>
./script.sh export <machine id> <path to kepler-model-db/models> <author github account> <benchmark type>
```
If you are agree to also share the raw data (preprocessed data and archived file of full pipeline), run
If you also agree to share the raw data (preprocessed data and archived file of full pipeline), run
```
./script.sh export_with_raw <machine id> <path to kepler-model-db/models> <author github account>
./script.sh export_with_raw <machine id> <path to kepler-model-db/models> <author github account> <benchmark type>
```
- set `NATIVE="true"` to export natively.
- Benchmark type accepts one of the values `sample`, `stressng` or `customBenchmark`.
2. Add information of your machine in `./models/README.md` in `kepler-model-db`. You may omit any column as needed.
3. Push PR to `kepler-model-db.
36 changes: 23 additions & 13 deletions model_training/script.sh
Original file line number Diff line number Diff line change
Expand Up @@ -130,16 +130,18 @@ function collect_data() {
BENCHMARK=$1
BENCHMARK_NS=$2
SLEEP_TIME=$3
kubectl apply -f benchmark/${BENCHMARK}.yaml
wait_for_benchmark ${BENCHMARK} ${BENCHMARK_NS} ${SLEEP_TIME}
save_benchmark ${BENCHMARK} ${BENCHMARK_NS}
if [ "$BENCHMARK" != "customBenchmark" ]; then
kubectl apply -f benchmark/${BENCHMARK}.yaml
wait_for_benchmark ${BENCHMARK} ${BENCHMARK_NS} ${SLEEP_TIME}
save_benchmark ${BENCHMARK} ${BENCHMARK_NS}
kubectl delete -f benchmark/${BENCHMARK}.yaml
fi
ARGS="-i ${BENCHMARK} -o ${BENCHMARK}_kepler_query -s ${PROM_SERVER}"
if [ -z "$NATIVE" ]; then
docker run --rm -v $CPE_DATAPATH:/data --network=host ${ENTRYPOINT_IMG} query ${ARGS}|| true
else
python ../cmd/main.py query ${ARGS}|| true
fi
kubectl delete -f benchmark/${BENCHMARK}.yaml
}

function deploy_prom_dependency(){
Expand Down Expand Up @@ -176,18 +178,26 @@ function collect() {
collect_data stressng cpe-operator-system 60
}

function custom_collect() {
collect_data customBenchmark
}

function quick_collect() {
collect_data sample cpe-operator-system 10
}

function train() {
train_model stressng_kepler_query ${VERSION}
train_model stressng_kepler_query ${VERSION}_stressng
}

function quick_train() {
train_model sample_kepler_query ${VERSION}_sample
}

function custom_train() {
train_model customBenchmark_kepler_query ${VERSION}_customBenchmark
}

function validate() {
BENCHMARK=$1
ARGS="-i ${BENCHMARK}_kepler_query --benchmark ${BENCHMARK}"
Expand All @@ -202,16 +212,16 @@ function _export() {
ID=$1
OUTPUT=$2
PUBLISHER=$3
INCLUDE_RAW=$4
MAIN_COLLECT_INPUT=$4
INCLUDE_RAW=$5

if [ $# -lt 3 ]; then
echo "need arguements: [machine_id] [path_to_models] [publisher]"
if [ $# -lt 4 ]; then
echo "need arguements: [machine_id] [path_to_models] [publisher] [benchmark_name]"
exit 2
fi

PIPELINE_NAME=${PIPELINE_PREFIX}${VERSION}
VALIDATE_INPUT="stressng_kepler_query"
MAIN_COLLECT_INPUT="stressng"
PIPELINE_NAME=${PIPELINE_PREFIX}${VERSION}_${MAIN_COLLECT_INPUT}
VALIDATE_INPUT="${MAIN_COLLECT_INPUT}_kepler_query"
ARGS="--id ${ID} -p ${PIPELINE_NAME} -i ${VALIDATE_INPUT} --benchmark ${MAIN_COLLECT_INPUT} --version ${VERSION} --publisher ${PUBLISHER} ${INCLUDE_RAW}"
echo "${ARGS}"
if [ -z "$NATIVE" ]; then
Expand All @@ -222,11 +232,11 @@ function _export() {
}

function export() {
_export $1 $2 $3
_export $1 $2 $3 $4
}

function export_with_raw() {
_export $1 $2 $3 "--include-raw true"
_export $1 $2 $3 $4 "--include-raw true"
}

function cleanup() {
Expand Down
5 changes: 1 addition & 4 deletions src/train/exporter/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,14 @@
cur_path = os.path.join(os.path.dirname(__file__))
sys.path.append(cur_path)

from validator import validate_arguments, find_acceptable_mae
from validator import find_acceptable_mae
from train_types import ModelOutputType, PowerSourceMap, FeatureGroup
from loader import load_csv, load_pipeline_metadata, get_model_group_path, load_metadata, load_train_args, get_preprocess_folder, get_general_filename
from saver import WEIGHT_FILENAME, save_pipeline_metadata, save_train_args
from format import time_to_str
from writer import generate_pipeline_page, generate_validation_results, append_version_readme

def export(data_path, pipeline_path, machine_path, machine_id, version, publisher, collect_date, include_raw=False):
if not validate_arguments(pipeline_path):
return

pipeline_metadata = load_metadata(pipeline_path)
if pipeline_metadata is None:
print("no pipeline metadata")
Expand Down
11 changes: 0 additions & 11 deletions src/train/exporter/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,7 @@
from loader import load_train_args
from config import ERROR_KEY

required_benchmark = ["stressng_kepler_query"]

default_threshold_percent = 20

def validate_arguments(pipeline_path):
train_args = load_train_args(pipeline_path)
inputs = train_args["input"].split(",")
missing_inputs = [input for input in required_benchmark if input not in inputs]
if len(missing_inputs) > 0:
print("missing required training inputs: ", missing_inputs)
return False
return True

def find_acceptable_mae(preprocess_data, metadata):
power_cols = [col for col in preprocess_data.columns if "power" in col]
Expand Down
4 changes: 4 additions & 0 deletions src/util/prom_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@
PROM_HEADERS = ''
PROM_QUERY_INTERVAL = 300
PROM_QUERY_STEP = 3
PROM_QUERY_START_TIME = ''
PROM_QUERY_END_TIME = ''

PROM_SERVER = getConfig('PROM_SERVER', PROM_SERVER)
PROM_HEADERS = getConfig('PROM_HEADERS', PROM_HEADERS)
PROM_HEADERS = None if PROM_HEADERS == '' else PROM_HEADERS
PROM_SSL_DISABLE = True if getConfig('PROM_SSL_DISABLE', PROM_SSL_DISABLE).lower() == 'true' else False
PROM_QUERY_INTERVAL = getConfig('PROM_QUERY_INTERVAL', PROM_QUERY_INTERVAL)
PROM_QUERY_STEP = getConfig('PROM_QUERY_STEP', PROM_QUERY_STEP)
PROM_QUERY_START_TIME = getConfig('PROM_QUERY_START_TIME', PROM_QUERY_START_TIME)
PROM_QUERY_END_TIME = getConfig('PROM_QUERY_END_TIME', PROM_QUERY_END_TIME)

PROM_THIRDPARTY_METRICS = getConfig('PROM_THIRDPARTY_METRICS', "").split(',')

Expand Down
Loading

0 comments on commit b1ee3f0

Please sign in to comment.