Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changes to collect metrics from Prometheus with benchmark run outside of kepler-model-server #191

Merged
merged 4 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 82 additions & 23 deletions cmd/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
src_path = os.path.join(os.path.dirname(__file__), '..', 'src')
sys.path.append(src_path)

from util.prom_types import PROM_SERVER, PROM_QUERY_INTERVAL, PROM_QUERY_STEP, PROM_HEADERS, PROM_SSL_DISABLE, PROM_THIRDPARTY_METRICS
from util.prom_types import PROM_SERVER, PROM_QUERY_INTERVAL, PROM_QUERY_STEP, PROM_QUERY_START_TIME, PROM_QUERY_END_TIME, PROM_HEADERS, PROM_SSL_DISABLE, PROM_THIRDPARTY_METRICS
from util.prom_types import metric_prefix as KEPLER_METRIC_PREFIX, node_info_column, prom_responses_to_results, TIMESTAMP_COL, feature_to_query, update_thirdparty_metrics
from util.train_types import ModelOutputType, FeatureGroup, FeatureGroups, is_single_source_feature_group, SingleSourceFeatures
from util.loader import load_json, DEFAULT_PIPELINE, load_pipeline_metadata, get_pipeline_path, get_model_group_path, list_pipelines, list_model_names, load_metadata, load_csv, get_machine_path, get_preprocess_folder, get_general_filename
Expand All @@ -42,10 +42,16 @@ def print_file_to_stdout(args):

def extract_time(benchmark_filename):
data = load_json(data_path, benchmark_filename)
start_str = data["metadata"]["creationTimestamp"]
start = datetime.datetime.strptime(start_str, '%Y-%m-%dT%H:%M:%SZ')
end_str = data["status"]["results"][-1]["repetitions"][-1]["pushedTime"].split(".")[0]
end = datetime.datetime.strptime(end_str, '%Y-%m-%d %H:%M:%S')
if benchmark_filename != "customBenchmark":
start_str = data["metadata"]["creationTimestamp"]
start = datetime.datetime.strptime(start_str, '%Y-%m-%dT%H:%M:%SZ')
end_str = data["status"]["results"][-1]["repetitions"][-1]["pushedTime"].split(".")[0]
end = datetime.datetime.strptime(end_str, '%Y-%m-%d %H:%M:%S')
else:
start_str = data["startTimeUTC"]
start = datetime.datetime.strptime(start_str, '%Y-%m-%dT%H:%M:%SZ')
end_str = data["endTimeUTC"]
end = datetime.datetime.strptime(end_str, '%Y-%m-%dT%H:%M:%SZ')
print(UTC_OFFSET_TIMEDELTA)
return start-UTC_OFFSET_TIMEDELTA, end-UTC_OFFSET_TIMEDELTA

Expand Down Expand Up @@ -101,11 +107,11 @@ def summary_validation(validate_df):
def get_validate_df(benchmark_filename, query_response):
items = []
query_results = prom_responses_to_results(query_response)
queries = [query for query in query_results.keys() if "container" in query]
container_queries = [query for query in query_results.keys() if "container" in query]
status_data = load_json(data_path, benchmark_filename)
if status_data is None:
if status_data is None or status_data.get("status", None) == None:
# select all with keyword
for query in queries:
for query in container_queries:
df = query_results[query]
if len(df) == 0:
# set validate item // no value
Expand Down Expand Up @@ -141,7 +147,7 @@ def get_validate_df(benchmark_filename, query_response):
repetitions = result["repetitions"]
for rep in repetitions:
podname = rep["pod"]
for query in queries:
for query in container_queries:
df = query_results[query]
if len(df) == 0:
# set validate item // no value
Expand All @@ -167,18 +173,52 @@ def get_validate_df(benchmark_filename, query_response):
energy_queries = [query for query in query_results.keys() if "_joules" in query]
for query in energy_queries:
df = query_results[query]
filtered_df = df.copy()
if len(df) == 0:
# set validate item // no value
item = dict()
item["pod"] = ""
item["scenarioID"] = ""
item["query"] = query
item["count"] = 0
item[">0"] = 0
item["total"] = 0
items += [item]
continue
# set validate item
item = dict()
item["pod"] = ""
item["scenarioID"] = ""
item["query"] = query
item["count"] = len(filtered_df)
item[">0"] = len(filtered_df[filtered_df[query] > 0])
item["total"] = filtered_df[query].max()
item["count"] = len(df)
item[">0"] = len(df[df[query] > 0])
item["total"] = df[query].max()
items += [item]
other_queries = [query for query in query_results.keys() if query not in container_queries and query not in energy_queries]
for query in other_queries:
df = query_results[query]
if len(df) == 0:
# set validate item // no value
item = dict()
item["pod"] = benchmark_filename
item["scenarioID"] = ""
item["query"] = query
item["count"] = 0
item[">0"] = 0
item["total"] = 0
items += [item]
continue
# set validate item
item = dict()
item["pod"] = benchmark_filename
item["scenarioID"] = ""
item["query"] = query
item["count"] = len(df)
item[">0"] = len(df[df[query] > 0])
item["total"] = df[query].max()
items += [item]
validate_df = pd.DataFrame(items)
print(validate_df.groupby(["scenarioID", "query"]).sum()[["count", ">0"]])
if not validate_df.empty:
print(validate_df.groupby(["scenarioID", "query"]).sum()[["count", ">0"]])
return validate_df

def check_ot_fg(args, valid_fg):
Expand Down Expand Up @@ -207,9 +247,27 @@ def query(args):
prom = PrometheusConnect(url=args.server, headers=PROM_HEADERS, disable_ssl=PROM_SSL_DISABLE)
benchmark_filename = args.input
if benchmark_filename == "":
print("Query last {} interval.".format(args.interval))
end = datetime.datetime.now()
start = end - datetime.timedelta(seconds=args.interval)
print("need name of the benchmark: sample, stressng, or customBenchmark")
exit()
filepath = os.path.join(data_path, benchmark_filename+".json")
if not os.path.isfile(filepath):
if args.start_time != "" and args.end_time != "":
# by [start time, end time]
print("Query from start_time {} to end_time {}.".format(args.start_time, args.end_time))
start = datetime.datetime.strptime(args.start_time, '%Y-%m-%dT%H:%M:%SZ')
end = datetime.datetime.strptime(args.end_time , '%Y-%m-%dT%H:%M:%SZ')
else:
# by interval
print("Query last {} interval.".format(args.interval))
end = datetime.datetime.now(datetime.timezone.utc)
start = end - datetime.timedelta(seconds=args.interval)
# save benchmark
item = dict()
item["startTimeUTC"] = start.strftime("%Y-%m-%dT%H:%M:%SZ")
item["endTimeUTC"] = end.strftime("%Y-%m-%dT%H:%M:%SZ")
save_json(path=data_path, name=benchmark_filename, data=item)
start = datetime.datetime.strptime(item["startTimeUTC"], '%Y-%m-%dT%H:%M:%SZ') - UTC_OFFSET_TIMEDELTA
end = datetime.datetime.strptime(item["endTimeUTC"], '%Y-%m-%dT%H:%M:%SZ') - UTC_OFFSET_TIMEDELTA
else:
print("Query from {}.".format(benchmark_filename))
start, end = extract_time(benchmark_filename)
Expand All @@ -227,10 +285,9 @@ def query(args):
response = _range_queries(prom, queries, start, end, args.step, None)
save_json(path=data_path, name=args.output, data=response)
# try validation if applicable
if benchmark_filename != "" and args.metric_prefix == KEPLER_METRIC_PREFIX:
validate_df = get_validate_df(benchmark_filename, response)
summary_validation(validate_df)
save_csv(path=data_path, name=args.output + "_validate_result", data=validate_df)
validate_df = get_validate_df(benchmark_filename, response)
summary_validation(validate_df)
save_csv(path=data_path, name=args.output + "_validate_result", data=validate_df)

def validate(args):
if not args.benchmark:
Expand Down Expand Up @@ -655,7 +712,7 @@ def plot(args):
energy_sources = args.energy_source.split(",")
output_folder = os.path.join(data_path, args.output)
if not os.path.exists(output_folder):
os.mkdir(output_folder)
os.makedirs(output_folder, exist_ok=True)
if args.target_data == "preprocess":
data_saved_path = get_preprocess_folder(pipeline_path)
feature_plot = []
Expand Down Expand Up @@ -870,9 +927,11 @@ def plot_scenario(args):
# Query arguments
parser.add_argument("-s", "--server", type=str, help="Specify prometheus server.", default=PROM_SERVER)
parser.add_argument("--interval", type=int, help="Specify query interval.", default=PROM_QUERY_INTERVAL)
parser.add_argument("--start-time", type=str, help="Specify query start time.", default=PROM_QUERY_START_TIME)
parser.add_argument("--end-time", type=str, help="Specify query end time.", default=PROM_QUERY_END_TIME)
parser.add_argument("--step", type=str, help="Specify query step.", default=PROM_QUERY_STEP)
parser.add_argument("--metric-prefix", type=str, help="Specify metrix prefix to filter.", default=KEPLER_METRIC_PREFIX)
parser.add_argument("-tm", "--thirdparty-metrics", nargs='+', help="Specify the thirdparty metrics that not included by Kepler", default="")
parser.add_argument("-tm", "--thirdparty-metrics", nargs='+', help="Specify the thirdparty metrics that are not included by Kepler", default="")

# Train arguments
parser.add_argument("-p", "--pipeline-name", type=str, help="Specify pipeline name.")
Expand Down
41 changes: 36 additions & 5 deletions model_training/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,31 @@ This is only for testing purpose.

It might take an hour to run and collect all benchmarks. Output including CPE CR and Prometheus query response will be in `data` folder by default.

## 3. Profile and train
## 3. Collect metrics without running benchmark
Users might want to run a custom benchmark outside of `kepler-model-server` and collect metrics for training the model using `kepler-model-server`.

### Custom Benchmark

./script.sh custom_collect

Use either `interval` or [`start_time`, `end_time`] options to set the desired time window for metrics collection from Prometheus.

## 4. Validate collected metrics
Validation of metrics happens by default at the time of their collection. It is also possible to validate the collected metrics explicitly.

### Quick sample

./script.sh validate sample

### Full run

./script.sh validate stressng

### Custom benchmark

./script.sh validate customBenchmark

## 5. Profile and train

You can train the model by using docker image which require no environment setup but can be limited by docker limitation or train the model natively by setting up your python environment as follows.

Expand All @@ -73,6 +97,12 @@ You can train the model by using docker image which require no environment setup
./script.sh train
```

#### Custom benchmark

```
./script.sh custom_train
```

Training output will be in `/data` folder by default. The folder contains:
- preprocessed data from Prometheus query response
- profiles
Expand Down Expand Up @@ -106,20 +136,21 @@ Run

1. Fork `kepler-model-db`.

1. Validate and make a copy by export command. Need to define `machine id`, `local path to forked kepler-model-db/models`, and `author github account`.
1. Validate and make a copy by export command. Need to define `machine id`, `local path to forked kepler-model-db/models`, `author github account` and `benchmark type`.

Run
```
./script.sh export <machine id> <path to kepler-model-db/models> <author github account>
./script.sh export <machine id> <path to kepler-model-db/models> <author github account> <benchmark type>
sunya-ch marked this conversation as resolved.
Show resolved Hide resolved
```

If you are agree to also share the raw data (preprocessed data and archived file of full pipeline), run
If you also agree to share the raw data (preprocessed data and archived file of full pipeline), run

```
./script.sh export_with_raw <machine id> <path to kepler-model-db/models> <author github account>
./script.sh export_with_raw <machine id> <path to kepler-model-db/models> <author github account> <benchmark type>
sunya-ch marked this conversation as resolved.
Show resolved Hide resolved
```

- set `NATIVE="true"` to export natively.
- Benchmark type accepts one of the values `sample`, `stressng` or `customBenchmark`.

2. Add information of your machine in `./models/README.md` in `kepler-model-db`. You may omit any column as needed.
3. Push PR to `kepler-model-db.
36 changes: 23 additions & 13 deletions model_training/script.sh
Original file line number Diff line number Diff line change
Expand Up @@ -130,16 +130,18 @@ function collect_data() {
BENCHMARK=$1
BENCHMARK_NS=$2
SLEEP_TIME=$3
kubectl apply -f benchmark/${BENCHMARK}.yaml
wait_for_benchmark ${BENCHMARK} ${BENCHMARK_NS} ${SLEEP_TIME}
save_benchmark ${BENCHMARK} ${BENCHMARK_NS}
if [ "$BENCHMARK" != "customBenchmark" ]; then
kubectl apply -f benchmark/${BENCHMARK}.yaml
wait_for_benchmark ${BENCHMARK} ${BENCHMARK_NS} ${SLEEP_TIME}
save_benchmark ${BENCHMARK} ${BENCHMARK_NS}
kubectl delete -f benchmark/${BENCHMARK}.yaml
fi
ARGS="-i ${BENCHMARK} -o ${BENCHMARK}_kepler_query -s ${PROM_SERVER}"
if [ -z "$NATIVE" ]; then
docker run --rm -v $CPE_DATAPATH:/data --network=host ${ENTRYPOINT_IMG} query ${ARGS}|| true
else
python ../cmd/main.py query ${ARGS}|| true
fi
kubectl delete -f benchmark/${BENCHMARK}.yaml
}

function deploy_prom_dependency(){
Expand Down Expand Up @@ -176,18 +178,26 @@ function collect() {
collect_data stressng cpe-operator-system 60
}

function custom_collect() {
collect_data customBenchmark
}

function quick_collect() {
collect_data sample cpe-operator-system 10
}

function train() {
train_model stressng_kepler_query ${VERSION}
train_model stressng_kepler_query ${VERSION}_stressng
knarayan marked this conversation as resolved.
Show resolved Hide resolved
}

function quick_train() {
train_model sample_kepler_query ${VERSION}_sample
}

function custom_train() {
train_model customBenchmark_kepler_query ${VERSION}_customBenchmark
}

function validate() {
BENCHMARK=$1
ARGS="-i ${BENCHMARK}_kepler_query --benchmark ${BENCHMARK}"
Expand All @@ -202,16 +212,16 @@ function _export() {
ID=$1
OUTPUT=$2
PUBLISHER=$3
INCLUDE_RAW=$4
MAIN_COLLECT_INPUT=$4
INCLUDE_RAW=$5

if [ $# -lt 3 ]; then
echo "need arguements: [machine_id] [path_to_models] [publisher]"
if [ $# -lt 4 ]; then
echo "need arguements: [machine_id] [path_to_models] [publisher] [benchmark_name]"
exit 2
fi

PIPELINE_NAME=${PIPELINE_PREFIX}${VERSION}
VALIDATE_INPUT="stressng_kepler_query"
MAIN_COLLECT_INPUT="stressng"
PIPELINE_NAME=${PIPELINE_PREFIX}${VERSION}_${MAIN_COLLECT_INPUT}
VALIDATE_INPUT="${MAIN_COLLECT_INPUT}_kepler_query"
ARGS="--id ${ID} -p ${PIPELINE_NAME} -i ${VALIDATE_INPUT} --benchmark ${MAIN_COLLECT_INPUT} --version ${VERSION} --publisher ${PUBLISHER} ${INCLUDE_RAW}"
echo "${ARGS}"
if [ -z "$NATIVE" ]; then
Expand All @@ -222,11 +232,11 @@ function _export() {
}

function export() {
_export $1 $2 $3
_export $1 $2 $3 $4
}

function export_with_raw() {
_export $1 $2 $3 "--include-raw true"
_export $1 $2 $3 $4 "--include-raw true"
}

function cleanup() {
Expand Down
2 changes: 1 addition & 1 deletion src/train/exporter/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from loader import load_train_args
from config import ERROR_KEY

required_benchmark = ["stressng_kepler_query"]
required_benchmark = ["sample_kepler_query", "stressng_kepler_query", "customBenchmark_kepler_query"]
knarayan marked this conversation as resolved.
Show resolved Hide resolved

default_threshold_percent = 20

Expand Down
4 changes: 4 additions & 0 deletions src/util/prom_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@
PROM_HEADERS = ''
PROM_QUERY_INTERVAL = 300
PROM_QUERY_STEP = 3
PROM_QUERY_START_TIME = '2023-11-13T02:47:57Z'
knarayan marked this conversation as resolved.
Show resolved Hide resolved
PROM_QUERY_END_TIME = ''

PROM_SERVER = getConfig('PROM_SERVER', PROM_SERVER)
PROM_HEADERS = getConfig('PROM_HEADERS', PROM_HEADERS)
PROM_HEADERS = None if PROM_HEADERS == '' else PROM_HEADERS
PROM_SSL_DISABLE = True if getConfig('PROM_SSL_DISABLE', PROM_SSL_DISABLE).lower() == 'true' else False
PROM_QUERY_INTERVAL = getConfig('PROM_QUERY_INTERVAL', PROM_QUERY_INTERVAL)
PROM_QUERY_STEP = getConfig('PROM_QUERY_STEP', PROM_QUERY_STEP)
PROM_QUERY_START_TIME = getConfig('PROM_QUERY_START_TIME', PROM_QUERY_START_TIME)
PROM_QUERY_END_TIME = getConfig('PROM_QUERY_END_TIME', PROM_QUERY_END_TIME)

PROM_THIRDPARTY_METRICS = getConfig('PROM_THIRDPARTY_METRICS', "").split(',')

Expand Down
2 changes: 1 addition & 1 deletion src/util/saver.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def assure_path(path):
if path == '':
return ''
if not os.path.exists(path):
os.mkdir(path)
os.makedirs(path, exist_ok=True)
return path

def save_json(path, name, data):
Expand Down
Loading