Skip to content

Commit

Permalink
update kepler-model-db link, serve by spec, add db CI, visualize powe…
Browse files Browse the repository at this point in the history
…r curve

Signed-off-by: Sunyanan Choochotkaew <[email protected]>
  • Loading branch information
sunya-ch committed Mar 29, 2024
1 parent 0f6f131 commit c8a5f09
Show file tree
Hide file tree
Showing 42 changed files with 766 additions and 199 deletions.
37 changes: 10 additions & 27 deletions .github/workflows/integration-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ on:
description: 'Kepler image tag'
required: true
type: string
additional_opts:
description: 'additional deployment opts'
required: true
type: string

env:
BASE_IMAGE: ${{ inputs.image_repo }}/kepler_model_server_base:${{ inputs.image_tag }}
Expand Down Expand Up @@ -65,45 +69,24 @@ jobs:
curl -s "https://raw.githubusercontent.com/kubernetes-sigs/kustomize/master/hack/install_kustomize.sh" | bash
chmod +x kustomize
mv kustomize /usr/local/bin/
- name: test deploying kepler with only estimator
run: |
make deploy
make e2e-test
make cleanup
env:
OPTS: "ESTIMATOR"
- name: test deploying kepler with only server
run: |
make deploy
make e2e-test
make cleanup
env:
OPTS: "SERVER"
- name: test deploying kepler with estimator and model server
run: |
make deploy
make e2e-test
make cleanup
env:
OPTS: "ESTIMATOR SERVER"
- name: test deploying dummy kepler with only estimator
- name: test deploying with only estimator
run: |
make deploy
make e2e-test
make cleanup
env:
OPTS: "ESTIMATOR TEST"
- name: test deploying dummy kepler with only server
OPTS: "ESTIMATOR${{ inputs.additional_opts }}"
- name: test deploying with only server
run: |
make deploy
make e2e-test
make cleanup
env:
OPTS: "SERVER TEST"
- name: test deploying dummy kepler with estimator and model server
OPTS: "SERVER${{ inputs.additional_opts }}"
- name: test deploying with estimator and model server
run: |
make deploy
make e2e-test
make cleanup
env:
OPTS: "ESTIMATOR SERVER TEST"
OPTS: "ESTIMATOR SERVER${{ inputs.additional_opts }}"
29 changes: 27 additions & 2 deletions .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ jobs:
image_tag: ${{ needs.check-branch.outputs.tag }}
pipeline_name: std_v0.7

integration-test:
integration-test-internal-only:
needs: [check-secret, check-branch, check-change, base-image]
if: always()
uses: ./.github/workflows/integration-test.yml
Expand All @@ -174,4 +174,29 @@ jobs:
docker_secret: ${{ needs.check-secret.outputs.docker-secret }}
image_repo: ${{ vars.IMAGE_REPO || 'docker.io/library' }}
image_tag: ${{ needs.check-branch.outputs.tag }}
kepler_tag: release-0.7.7
kepler_tag: release-0.7.7
additional_opts: " TEST"

integration-test-with-exporter:
needs: [check-secret, check-branch, check-change, base-image]
if: always()
uses: ./.github/workflows/integration-test.yml
with:
base_change: ${{ needs.check-change.outputs.base }}
docker_secret: ${{ needs.check-secret.outputs.docker-secret }}
image_repo: ${{ vars.IMAGE_REPO || 'docker.io/library' }}
image_tag: ${{ needs.check-branch.outputs.tag }}
kepler_tag: release-0.7.7
additional_opts: ""

integration-test-with-exporter-and-db:
needs: [check-secret, check-branch, check-change, base-image]
if: always()
uses: ./.github/workflows/integration-test.yml
with:
base_change: ${{ needs.check-change.outputs.base }}
docker_secret: ${{ needs.check-secret.outputs.docker-secret }}
image_repo: ${{ vars.IMAGE_REPO || 'docker.io/library' }}
image_tag: ${{ needs.check-branch.outputs.tag }}
kepler_tag: release-0.7.7
additional_opts: " DB"
1 change: 1 addition & 0 deletions .github/workflows/unit-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ jobs:
run: make test-pipeline
- name: Test model server
run: make test-model-server
timeout-minutes: 5
- name: Test estimator
run: make test-estimator
timeout-minutes: 5
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ test-estimator: run-estimator run-collector-client clean-estimator

# test estimator --> model-server
run-model-server:
$(CTR_CMD) run -d --platform linux/amd64 -e "MODEL_TOPURL=http://localhost:8110" -v ${MODEL_PATH}:/mnt/models -p 8100:8100 --name model-server $(TEST_IMAGE) /bin/bash -c "python3.8 tests/http_server.py & sleep 5 && python3.8 src/server/model_server.py"
sleep 5
$(CTR_CMD) run -d --platform linux/amd64 -e "MODEL_TOPURL=http://localhost:8110" -v ${MODEL_PATH}:/mnt/models -p 8100:8100 --name model-server $(TEST_IMAGE) /bin/bash -c "python3.8 tests/http_server.py & sleep 10 && python3.8 src/server/model_server.py"
while ! docker logs model-server | grep -q Serving; do echo "waiting for model-server to serve"; sleep 5; done

run-estimator-client:
$(CTR_CMD) exec model-server /bin/bash -c "python3.8 -u ./tests/estimator_model_request_test.py"
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ Compatible version: python 3.8
|Test case|Command|
|---|---|
|[Training pipeline](./tests/README.md#pipeline)|python -u ./tests/pipeline_test.py|
|[Model server](./tests/README.md#estimator-model-request-to-model-server)|Terminal 1: python src/server/model_server.py <br>Terminal 2: python -u tests/estimator_model_request_test.py|
|[Model server](./tests/README.md#estimator-model-request-to-model-server)|Terminal 1: export MODEL_PATH=$(pwd)/tests/models;python src/server/model_server.py <br>Terminal 2: python -u tests/estimator_model_request_test.py|
|[Estimator](./tests/README.md#estimator-power-request-from-collector)|Terminal 1: python src/estimate/estimator.py<br>Terminal 2: python -u tests/estimator_power_request_test.py|
|Estimator with Model Server|Terminal 1: export MODEL_PATH=$(pwd)/tests/models;python src/server/model_server.py <br>Terminal 2: export MODEL_SERVER_URL=http://localhost:8100;export MODEL_SERVER_ENABLE=true;python -u src/estimate/estimator.py<br>Terminal 3: python -u tests/estimator_power_request_test.py
|[Offline Trainer](./tests/README.md#offline-trainer)|Terminal 1: python src/train/offline_trainer.py<br>Terminal 2: python -u tests/offline_trainer_test.py|

For more test information, check [here](./tests/).
Expand Down
2 changes: 1 addition & 1 deletion cmd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ Use kepler model server function as a standalone docker container.
5.3. Plot prediction result on specific trainer model and feature group (`estimate`)

```bash
docker run --rm -v "$(pwd)/data":/data quay.io/sustainable_computing_io/kepler_model_server:v0.7 plot --target-data estimate -i output_kepler_query --model-name GradientBoostingRegressorTrainer_1 --feature-group BPFOnly
docker run --rm -v "$(pwd)/data":/data quay.io/sustainable_computing_io/kepler_model_server:v0.7 plot --target-data estimate -i output_kepler_query --model-name GradientBoostingRegressorTrainer_0 --feature-group BPFOnly
```

5.4. Plot prediction error comparison among feature group and trainer model (`error`)
Expand Down
111 changes: 111 additions & 0 deletions cmd/cmd_plot.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
from util.prom_types import TIMESTAMP_COL
from util import PowerSourceMap

from util.train_types import FeatureGroup, ModelOutputType, weight_support_trainers
from util.loader import load_metadata, load_scaler, get_model_group_path
from train.profiler.node_type_index import NodeTypeIndexCollection
from estimate import load_model
markers = ['o', 's', '^', 'v', '<', '>', 'p', 'P', '*', 'x', '+', '|', '_']

def ts_plot(data, cols, title, output_folder, name, labels=None, subtitles=None, ylabel=None):
plot_height = 3
Expand Down Expand Up @@ -147,4 +152,110 @@ def metadata_plot(args, energy_source, metadata_df, output_folder, name):
plt.legend(frameon=False)
filename = os.path.join(output_folder, name + ".png")
fig.savefig(filename)
plt.close()

def power_curve_plot(args, data_path, energy_source, output_folder, name):
model_toppath = data_path
pipeline_name = args.pipeline_name
pipeline_path = os.path.join(model_toppath, pipeline_name)
node_collection = NodeTypeIndexCollection(pipeline_path)
all_node_types = sorted(list(node_collection.node_type_index.keys()))
output_type = ModelOutputType[args.output_type]
models, _, cpu_ms_max = _load_all_models(model_toppath=model_toppath, output_type=output_type, name=pipeline_name, node_types=all_node_types, energy_source=energy_source)
if len(models) > 0:
_plot_models(models, cpu_ms_max, energy_source, output_folder, name)

def _get_model(model_toppath, trainer, model_node_type, output_type, name, energy_source):
feature_group = FeatureGroup.BPFOnly
model_name = "{}_{}".format(trainer, model_node_type)
group_path = get_model_group_path(model_toppath, output_type, feature_group, energy_source, name)
model_path = os.path.join(group_path, model_name)
model = load_model(model_path)
metadata = load_metadata(model_path)
if metadata is None:
return model, None, None
scaler = load_scaler(model_path)
cpu_ms_max = scaler.max_abs_[0]
return model, metadata, cpu_ms_max

def _load_all_models(model_toppath, output_type, name, node_types, energy_source):
models_dict = dict()
metadata_dict = dict()
cpu_ms_max_dict = dict()
for model_node_type in node_types:
min_mae = None
for trainer in weight_support_trainers:
model, metadata, cpu_ms_max = _get_model(model_toppath, trainer, model_node_type, output_type=output_type, name=name, energy_source=energy_source)
if metadata is None:
continue
cpu_ms_max_dict[model_node_type] = cpu_ms_max
if min_mae is None or min_mae > metadata["mae"]:
min_mae = metadata["mae"]
models_dict[model_node_type], metadata_dict[model_node_type] = model, metadata
return models_dict, metadata_dict, cpu_ms_max_dict

def _plot_models(models, cpu_ms_max, energy_source, output_folder, name, max_plot=15, cpu_time_bin_num=10, sample_num=20):
from util.train_types import BPF_FEATURES
import numpy as np
import pandas as pd
import seaborn as sns
sns.set_palette("Paired")

import matplotlib.pyplot as plt

main_feature_col = BPF_FEATURES[0]
predicted_col = {
"acpi": "default_platform_power",
"intel_rapl": "default_package_power"
}

num_bins = len(cpu_ms_max)//cpu_time_bin_num + 1
nobin = False
if num_bins == 1:
nobin = True
values = np.array(list(cpu_ms_max.values()))
_, bins = np.histogram(values, bins=num_bins)
bin_size = len(bins) + 1 if not nobin else 1
data_with_prediction_list = [[] for _ in range(bin_size)]

num_cols = min(3, bin_size)

for node_type, model in models.items():
# generate data from scaler
xs = np.column_stack((np.linspace(0, cpu_ms_max[node_type], sample_num), np.zeros(sample_num)))
data = pd.DataFrame(xs, columns=models[node_type].estimator.features)
_, data_with_prediction = model.append_prediction(data)
if nobin:
bin_index = 0
else:
bin_index = np.digitize([cpu_ms_max[node_type]], bins)[0]
data_with_prediction_list[bin_index] += [(node_type, data_with_prediction)]
total_graphs = 0
for data_with_predictions in data_with_prediction_list:
total_graphs += int(np.ceil(len(data_with_predictions) / max_plot))
num_rows = int(np.ceil(total_graphs/num_cols))

fig, axes = plt.subplots(num_rows, num_cols, figsize=(int(6*num_cols), int(5*num_rows)))
axes_index = 0
for data_with_predictions in data_with_prediction_list:
index = 0
for data_with_prediction_index in data_with_predictions:
if num_rows == 1 and num_cols == 1:
ax = axes
else:
ax = axes[axes_index//num_cols][axes_index%num_cols]
node_type = data_with_prediction_index[0]
data_with_prediction = data_with_prediction_index[1]
sns.lineplot(data=data_with_prediction, x=main_feature_col, y=predicted_col[energy_source], label="type={}".format(node_type), marker=markers[index], ax=ax)
index += 1
index = index % len(markers)
if index % max_plot == 0:
ax.set_ylabel("Predicted power (W)")
axes_index += 1
if len(data_with_predictions) > 0:
ax.set_ylabel("Predicted power (W)")
axes_index += 1
filename = os.path.join(output_folder, name + ".png")
plt.tight_layout()
fig.savefig(filename)
plt.close()
40 changes: 26 additions & 14 deletions cmd/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@
from util.prom_types import metric_prefix as KEPLER_METRIC_PREFIX, prom_responses_to_results, TIMESTAMP_COL, feature_to_query, update_thirdparty_metrics, node_info_column
from util.extract_types import get_expected_power_columns
from util.train_types import ModelOutputType, FeatureGroups, is_single_source_feature_group, all_feature_groups, default_trainers
from util.loader import load_json, DEFAULT_PIPELINE, load_pipeline_metadata, get_pipeline_path, get_model_group_path, list_pipelines, list_model_names, load_metadata, load_csv, get_preprocess_folder, get_general_filename, load_machine_spec
from util.saver import save_json, save_csv, save_train_args, _pipeline_model_metadata_filename
from util.loader import default_train_output_pipeline, load_json, load_pipeline_metadata, get_pipeline_path, get_model_group_path, list_pipelines, list_model_names, load_metadata, load_csv, get_preprocess_folder, get_general_filename, load_machine_spec
from util.saver import save_json, save_csv, save_train_args, _pipeline_model_metadata_filename, _power_curve_filename
from util.config import ERROR_KEY, model_toppath
from util import get_valid_feature_group_from_queries, PowerSourceMap
from train.prom.prom_query import _range_queries
from train.exporter import exporter
from train import load_class
from train.profiler.node_type_index import NodeTypeIndexCollection, NodeTypeSpec, generate_spec

from cmd_plot import ts_plot, feature_power_plot, summary_plot, metadata_plot
from cmd_plot import ts_plot, feature_power_plot, summary_plot, metadata_plot, power_curve_plot
from cmd_util import extract_time, save_query_results, get_validate_df, summary_validation, get_extractor, check_ot_fg, get_pipeline, assert_train, get_isolator, UTC_OFFSET_TIMEDELTA

import threading
Expand Down Expand Up @@ -215,7 +215,7 @@ def isolate(args):
extracted_data, power_labels = extract(args)
if extracted_data is None or power_labels is None:
return None
pipeline_name = DEFAULT_PIPELINE if not args.pipeline_name else args.pipeline_name
pipeline_name = default_train_output_pipeline if not args.pipeline_name else args.pipeline_name
isolator = get_isolator(data_path, args.isolator, args.profile, pipeline_name, args.target_hints, args.bg_hints, args.abs_pipeline_name)
isolated_data = isolator.isolate(extracted_data, label_cols=power_labels, energy_source=args.energy_source)
if args.output:
Expand Down Expand Up @@ -247,7 +247,7 @@ def isolate_from_data(args):
energy_components = PowerSourceMap[args.energy_source]
extracted_data = load_csv(data_path, "extracted_" + args.input)
power_columns = get_expected_power_columns(energy_components=energy_components)
pipeline_name = DEFAULT_PIPELINE if not args.pipeline_name else args.pipeline_name
pipeline_name = default_train_output_pipeline if not args.pipeline_name else args.pipeline_name
isolator = get_isolator(data_path, args.isolator, args.profile, pipeline_name, args.target_hints, args.bg_hints, args.abs_pipeline_name)
isolated_data = isolator.isolate(extracted_data, label_cols=power_columns, energy_source=args.energy_source)
if args.output:
Expand Down Expand Up @@ -365,7 +365,7 @@ def train(args):
elif PROM_THIRDPARTY_METRICS != [""]:
update_thirdparty_metrics(PROM_THIRDPARTY_METRICS)

pipeline_name = DEFAULT_PIPELINE
pipeline_name = default_train_output_pipeline
if args.pipeline_name:
pipeline_name = args.pipeline_name

Expand Down Expand Up @@ -599,14 +599,16 @@ def estimate(args):
- `estimate` passes all arguments to `estimate` function, and plots the predicted time series and correlation between usage and power metrics
- `error` passes all arguments to `estimate` function, and plots the summary of prediction error
- `metadata` plot pipeline metadata
- `curve_power` plot curve power
- --input : specify related path for pipeline metadata
- --energy-source : specify target energy sources (use comma(,) as delimiter)
- --extractor : specify extractor to get preprocessed data of AbsPower model linked to the input data
- --isolator : specify isolator to get preprocessed data of DynPower model linked to the input data
- --pipeline_name : specify pipeline name
"""

def plot(args):
pipeline_name = DEFAULT_PIPELINE if not args.pipeline_name else args.pipeline_name
pipeline_name = default_train_output_pipeline if not args.pipeline_name else args.pipeline_name
pipeline_path = get_pipeline_path(data_path, pipeline_name=pipeline_name)
if not args.target_data:
print("must give target data via --target-data to plot.")
Expand Down Expand Up @@ -691,9 +693,13 @@ def plot(args):
elif args.target_data == "metadata":
for energy_source in energy_sources:
data_filename = _pipeline_model_metadata_filename(energy_source, ot.name)
pipeline_path = get_pipeline_path(data_path, pipeline_name=pipeline_name)
model_metadata_df = load_pipeline_metadata(pipeline_path, energy_source, ot.name)
model_metadata_df = load_pipeline_metadata(args.input, energy_source, ot.name)
metadata_plot(args, energy_source, model_metadata_df, output_folder, data_filename)
elif args.target_data == "power_curve":
for energy_source in energy_sources:
data_filename = _power_curve_filename(energy_source, ot.name)
model_metadata_df = load_pipeline_metadata(args.input, energy_source, ot.name)
power_curve_plot(args, data_path, energy_source, output_folder, data_filename)

"""
export
Expand All @@ -709,6 +715,7 @@ def plot(args):
- custom benchmark in json with `startTimeUTC` and `endTimeUTC` data
- --collect-date : specify collection time manually in UTC
- --input : specify kepler query response file (output of `query` function) - optional
- --zip : specify whether to zip pipeline
"""

def export(args):
Expand Down Expand Up @@ -742,14 +749,18 @@ def export(args):
pipeline_path = get_pipeline_path(data_path, pipeline_name=pipeline_name)

local_export_path = exporter.export(data_path, pipeline_path, output_path, publisher=args.publisher, collect_date=collect_date, inputs=inputs)
args.target_data = "metadata"

args.input = local_export_path
args.output = local_export_path
args.output_type = "AbsPower"
args.energy_source = ",".join(PowerSourceMap.keys())
plot(args)
args.output_type = "DynPower"
plot(args)
for target_data in ["metadata", "power_curve"]:
for ot in ModelOutputType:
args.target_data = target_data
args.output_type = ot.name
plot(args)
if args.zip:
import shutil
shutil.make_archive(local_export_path, 'zip', local_export_path)

"""
plot_scenario
Expand Down Expand Up @@ -886,6 +897,7 @@ def plot_scenario(args):
parser.add_argument("--publisher", type=str, help="Specify github account of model publisher")
parser.add_argument("--include-raw", type=bool, help="Include raw query data")
parser.add_argument("--collect-date", type=str, help="Specify collect date directly")
parser.add_argument("--zip", type=bool, help="Specify whether to zip pipeline", default=False)

parser.add_argument("--id", type=str, help="specify machine id")

Expand Down
Loading

0 comments on commit c8a5f09

Please sign in to comment.