From a088376f0b508e93bccb54e1fb8a544e76bf9676 Mon Sep 17 00:00:00 2001 From: Sunyanan Choochotkaew Date: Thu, 25 Jan 2024 19:27:29 +0900 Subject: [PATCH] add local db, remove kubelet Signed-off-by: Sunyanan Choochotkaew --- .github/workflows/build-push.yml | 134 +++++++------ ...osted.yml => collect-data-self-hosted.yml} | 53 ++++- ...gration-test.yaml => integration-test.yml} | 0 .github/workflows/pull_request.yml | 79 ++++++++ .github/workflows/push.yml | 183 ++++++++++++++++++ .github/workflows/train-model-on-release.yml | 44 ----- .../{retrain-model.yml => train-model.yml} | 104 +++++++--- .../{unittest.yaml => unit-test.yml} | 37 ++-- .gitignore | 4 +- Makefile | 10 +- cmd/cmd_util.py | 52 ++--- dockerfiles/Dockerfile.test | 2 + manifests/test/file-server.yaml | 49 +++++ manifests/test/patch-estimator-sidecar.yaml | 6 + model_training/s3-pusher/s3-pusher.py | 11 +- model_training/tekton/pipelines/collect.yaml | 179 +++++++++++++++++ src/estimate/estimator.py | 10 +- src/server/model_server.py | 15 +- src/util/__init__.py | 2 +- src/util/config.py | 9 +- src/util/loader.py | 10 +- src/util/train_types.py | 24 +-- tests/e2e_test.sh | 38 +++- tests/estimator_model_request_test.py | 45 +++-- tests/http_server.py | 46 +++++ tests/minimal_trainer.py | 18 ++ tests/pipeline_test.py | 18 +- tests/trainer_test.py | 11 +- 28 files changed, 940 insertions(+), 253 deletions(-) rename .github/workflows/{train-model-self-hosted.yml => collect-data-self-hosted.yml} (83%) rename .github/workflows/{integration-test.yaml => integration-test.yml} (100%) create mode 100644 .github/workflows/pull_request.yml create mode 100644 .github/workflows/push.yml delete mode 100644 .github/workflows/train-model-on-release.yml rename .github/workflows/{retrain-model.yml => train-model.yml} (60%) rename .github/workflows/{unittest.yaml => unit-test.yml} (51%) create mode 100644 manifests/test/file-server.yaml create mode 100644 manifests/test/patch-estimator-sidecar.yaml create mode 100644 model_training/tekton/pipelines/collect.yaml create mode 100644 tests/http_server.py create mode 100644 tests/minimal_trainer.py diff --git a/.github/workflows/build-push.yml b/.github/workflows/build-push.yml index 48826215..9f26c848 100644 --- a/.github/workflows/build-push.yml +++ b/.github/workflows/build-push.yml @@ -1,30 +1,78 @@ -name: BuildPushDeployImage +name: Build-Push Kepler Model Server Image on: - push: - branches: [ main ] + workflow_call: + secrets: + docker_username: + description: 'Docker username' + required: false + docker_password: + description: 'Docker password' + required: false + inputs: + base_change: + description: 'Change flag on base image' + required: true + type: string + image_repo: + description: 'The image repo to use' + required: true + type: string + image_tag: + description: 'The image tag to use' + required: true + type: string + push: + description: 'Push image' + required: false + type: string + default: false + +env: + base_change: ${{ inputs.base_change }} + base_image: ${{ inputs.image_repo }}/kepler_model_server_base:${{ inputs.image_tag }} + image: ${{ inputs.image_repo }}/kepler_model_server:${{ inputs.image_tag }} jobs: - changes: + check-secret: runs-on: ubuntu-latest + outputs: - src: ${{ steps.changes.outputs.src }} + available: ${{ steps.check-secret.outputs.available }} + steps: - - uses: actions/checkout@v4 - - uses: dorny/paths-filter@v2 - id: changes - with: - filters: | - src: - - 'dockerfiles/requirements.txt' - - 'dockerfiles/Dockerfile.base' - - '.github/workflows/build-push.yml' + - name: Check Secret + id: check-secret + env: + SECRET: ${{ secrets.docker_password }} + run: | + if [ "$SECRET" == "" ]; then + echo "available=false" >> "$GITHUB_OUTPUT" + else + echo "available=true" >> "$GITHUB_OUTPUT" + fi - buildbase: - needs: changes - if: ${{ needs.changes.outputs.src == 'true' }} + + check-base-exist: runs-on: ubuntu-latest + + outputs: + exists: ${{ steps.check-base-exist.outputs.exists }} + + steps: + - name: Check if Docker base image exists + id: check-base-exist + run: | + if docker pull ${{ env.base_image }}; then + echo "exists=true" >> "$GITHUB_OUTPUT" + else + echo "exists=false" >> "$GITHUB_OUTPUT" + fi + + build-base: + runs-on: ubuntu-latest + needs: [check-secret, check-base-exist] steps: - name: checkout uses: actions/checkout@v4 @@ -32,53 +80,31 @@ jobs: uses: docker/setup-qemu-action@v3 - name: set up Docker Buildx uses: docker/setup-buildx-action@v3 - - name: Login to Quay - if: ${{ (github.repository_owner == 'sustainable-computing-io') && (github.ref == 'refs/heads/main') }} + - name: Login to Docker + if: ${{ needs.check-secret.outputs.available == 'true' }} uses: docker/login-action@v3 with: - registry: quay.io/sustainable_computing_io - username: ${{ secrets.BOT_NAME }} - password: ${{ secrets.BOT_TOKEN }} - - name: Build and push base image + registry: ${{ inputs.image_repo }} + username: ${{ secrets.docker_username }} + password: ${{ secrets.docker_password }} + - name: Build-push base image + if: ${{ (needs.check-secret.outputs.available == 'true') && ((needs.check-base-exist.outputs.exists == 'false') || (env.base_change == 'true')) }} uses: docker/build-push-action@v5 with: context: dockerfiles platforms: linux/amd64 push: true - tags: quay.io/sustainable_computing_io/kepler_model_server_base:v0.7 + tags: ${{ env.base_image }} file: dockerfiles/Dockerfile.base - - name: Build and push Docker image + - name: Replace value in file + if: ${{ (needs.check-secret.outputs.available == 'true') && ((needs.check-base-exist.outputs.exists == 'false') || (env.base_change == 'true')) }} + run: | + sed -i "s|quay.io/sustainable_computing_io/kepler_model_server_base:v0.7|${{ env.base_image }}|" dockerfiles/Dockerfile + - name: Build-push image uses: docker/build-push-action@v5 with: context: . platforms: linux/amd64 - push: true - tags: quay.io/sustainable_computing_io/kepler_model_server:latest,quay.io/sustainable_computing_io/kepler_model_server:v0.7 - file: dockerfiles/Dockerfile - - build: - needs: [changes] - if: ${{ needs.changes.outputs.src == 'false' }} - runs-on: ubuntu-latest - steps: - - name: checkout - uses: actions/checkout@v4 - - name: set up QEMU - uses: docker/setup-qemu-action@v3 - - name: set up Docker Buildx - uses: docker/setup-buildx-action@v3 - - name: Login to Quay - if: ${{ (github.repository_owner == 'sustainable-computing-io') && (github.ref == 'refs/heads/main') }} - uses: docker/login-action@v3 - with: - registry: quay.io/sustainable_computing_io - username: ${{ secrets.BOT_NAME }} - password: ${{ secrets.BOT_TOKEN }} - - name: Build and push Docker image - uses: docker/build-push-action@v5 - with: - context: . - platforms: linux/amd64 - push: true - tags: quay.io/sustainable_computing_io/kepler_model_server:latest,quay.io/sustainable_computing_io/kepler_model_server:v0.7 + push: ${{ inputs.push }} + tags: ${{ env.image }} file: dockerfiles/Dockerfile diff --git a/.github/workflows/train-model-self-hosted.yml b/.github/workflows/collect-data-self-hosted.yml similarity index 83% rename from .github/workflows/train-model-self-hosted.yml rename to .github/workflows/collect-data-self-hosted.yml index ccf5d4d9..19f63831 100644 --- a/.github/workflows/train-model-self-hosted.yml +++ b/.github/workflows/collect-data-self-hosted.yml @@ -1,4 +1,4 @@ -name: Reusable workflow example +name: Self-hosted Collect Data Workflow on: workflow_call: @@ -31,10 +31,19 @@ on: description: 'The instance type to use for the EC2 instance' required: true type: string + model_server_image: + description: 'Kepler Model Server image' + required: true + type: string + push: + description: 'Push model to s3' + required: false + type: string + default: false + env: KUBECONFIG: /root/.kube/config - VERSION: 0.7 jobs: setup-runner: @@ -145,7 +154,8 @@ jobs: kubectl apply -f tasks kubectl apply -f tasks/s3-pusher kubectl apply -f pipelines - - name: Run Tekton Pipeline + - name: Run Tekton Pipeline with S3Push + if: ${{ inputs.push == 'true' }} run: | cat <> "$GITHUB_OUTPUT" + else + echo "tag=$COMMIT" >> "$GITHUB_OUTPUT" + fi + + check-change: + runs-on: ubuntu-latest + + outputs: + base-change: ${{ steps.base-change.outputs.src }} + data-change: ${{ steps.data-change.outputs.src }} + modeling-change: ${{ steps.modeling-change.outputs.src }} + + steps: + - uses: actions/checkout@v4 + - uses: dorny/paths-filter@v2 + id: base-change + with: + filters: | + src: + - 'dockerfiles/requirements.txt' + - 'dockerfiles/Dockerfile.base' + - '.github/workflows/build-push.yml' + - uses: dorny/paths-filter@v2 + id: metric-change + with: + filters: | + src: + - 'src/util/prom_types.py' + - 'src/util/train_types.py' + - 'src/train/prom/**' + - 'model_training/tekton/tasks/stressng-task.yaml' + - 'model_training/tekton/pipelines/collect.yaml' + - 'hack/**' + - '.github/workflows/collect-data-self-hosted.yml' + - uses: dorny/paths-filter@v2 + id: modeling-change + with: + filters: | + src: + - 'src/**' + - 'model_training/**' + - 'hack/**' + - '.github/workflows/collect-data-self-hosted.yml' + - '.github/workflows/train-model.yml' + + check-secret: + runs-on: ubuntu-latest + + outputs: + docker-secret: ${{ steps.check-docker-secret.outputs.available }} + aws-secret: ${{ steps.check-aws-secret.outputs.available }} + + steps: + - name: Check Docker Secret + id: check-docker-secret + env: + DOCKER_SECRET: ${{ secrets.DOCKER_PASSWORD}} + run: | + if [ "$DOCKER_SECRET" == "" ]; then + echo "available=false" >> "$GITHUB_OUTPUT" + else + echo "available=true" >> "$GITHUB_OUTPUT" + fi + - name: Check AWS Secret + id: check-aws-secret + env: + AWS_SECRET: ${{ secrets.AWS_SECRET_ACCESS_KEY}} + run: | + if [ "$AWS_SECRET" == "" ]; then + echo "available=false" >> "$GITHUB_OUTPUT" + else + echo "available=true" >> "$GITHUB_OUTPUT" + fi + + unit-test: + needs: [check-change] + uses: ./.github/workflows/unit-test.yml + with: + base_change: ${{ needs.check-change.outputs.base-change }} + + build-push: + needs: [check-branch, check-secret, check-change] + uses: ./.github/workflows/build-push.yml + if: ${{ needs.check-secret.outputs.docker-secret == 'true' }} + with: + base_change: ${{ needs.check-change.outputs.base-change }} + image_repo: ${{ vars.IMAGE_REPO || 'docker.io/library' }} + image_tag: ${{ needs.check-branch.outputs.tag }} + push: true + secrets: + docker_username: ${{ secrets.DOCKER_USERNAME }} + docker_password: ${{ secrets.DOCKER_PASSWORD }} + + build-only: + needs: [check-branch, check-secret, check-change] + uses: ./.github/workflows/build-push.yml + if: ${{ needs.check-secret.outputs.docker-secret == 'false' }} + with: + base_change: false + image_repo: ${{ vars.IMAGE_REPO || 'docker.io/library' }} + image_tag: ${{ needs.check-branch.outputs.tag }} + push: false + + collect-data: + if: ${{ (needs.check-secret.outputs.aws-secret == 'true') && (needs.check-change.outputs.data-change == 'true') && (github.ref_name == 'main') }} + needs: [check-secret, check-change, build-push] + uses: ./.github/workflows/collect-data-self-hosted.yml + strategy: + matrix: + instance_type: [i3.metal] + max-parallel: 1 + with: + instance_type: ${{ matrix.instance_type }} + ami_id: 'ami-0e4d0bb9670ea8db0' + github_repo: ${{ github.repository }} + model_server_image: ${{ vars.IMAGE_REPO || 'docker.io/library' }}/kepler-model-server:${{ needs.check-branch.outputs.tag }} + push: true + secrets: + self_hosted_github_token: ${{ secrets.GH_SELF_HOSTED_RUNNER_TOKEN }} + aws_access_key_id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws_secret_access_key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + security_group_id: ${{ secrets.AWS_SECURITY_GROUP_ID }} + aws_region: ${{ secrets.AWS_REGION }} + + train-model: + needs: [check-secret, check-change, collect-data] + if: ${{ (needs.check-secret.outputs.aws-secret == 'true') && (needs.check-change.outputs.modeling-change == 'true') && (github.ref_name == 'main') }} + strategy: + matrix: + instance_type: [i3.metal] + max-parallel: 1 + uses: ./.github/workflows/train-model.yml + with: + instance_type: ${{ matrix.instance_type }} + ami_id: 'ami-0e4d0bb9670ea8db0' + github_repo: ${{ github.repository }} + model_server_image: ${{ vars.IMAGE_REPO || 'docker.io/library' }}/kepler-model-server:${{ needs.check-branch.outputs.tag }} + push: true + secrets: + self_hosted_github_token: ${{ secrets.GH_SELF_HOSTED_RUNNER_TOKEN }} + aws_access_key_id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws_secret_access_key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + aws_region: ${{ secrets.AWS_REGION }} \ No newline at end of file diff --git a/.github/workflows/train-model-on-release.yml b/.github/workflows/train-model-on-release.yml deleted file mode 100644 index b5ced33d..00000000 --- a/.github/workflows/train-model-on-release.yml +++ /dev/null @@ -1,44 +0,0 @@ -name: Train Power Model on Self-hosted Runner - -on: - push: - branches: - - v*-release - -jobs: - check-secret: - runs-on: ubuntu-latest - - outputs: - available: ${{ steps.check-secret.outputs.available }} - - steps: - - name: Check Secret - id: check-secret - env: - SECRET: ${{ secrets.AWS_SECRET_ACCESS_KEY}} - run: | - if [ "$SECRET" == "" ]; then - echo "available=false" >> "$GITHUB_OUTPUT" - else - echo "available=true" >> "$GITHUB_OUTPUT" - fi - - train-model: - if: ${{ needs.check-secret.outputs.available == 'true' }} - strategy: - matrix: - instance_type: [i3.metal, c6i.metal] - max-parallel: 1 - - uses: ./.github/workflows/train-model-self-hosted.yml - with: - instance_type: ${{ matrix.instance_type }} - ami_id: 'ami-0e4d0bb9670ea8db0' - github_repo: ${{ github.repository }} - secrets: - self_hosted_github_token: ${{ secrets.GH_SELF_HOSTED_RUNNER_TOKEN }} - aws_access_key_id: ${{ secrets.AWS_ACCESS_KEY_ID }} - aws_secret_access_key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} - security_group_id: ${{ secrets.AWS_SECURITY_GROUP_ID }} - aws_region: ${{ secrets.AWS_REGION }} diff --git a/.github/workflows/retrain-model.yml b/.github/workflows/train-model.yml similarity index 60% rename from .github/workflows/retrain-model.yml rename to .github/workflows/train-model.yml index e18ff08b..f869789e 100644 --- a/.github/workflows/retrain-model.yml +++ b/.github/workflows/train-model.yml @@ -1,19 +1,49 @@ -name: Retrain Power on GitHub Runner +name: Train Power Model on GitHub Runner on: - push: - paths: - - src - - hack - - model_training - - .github/workflows/retrain-model.yml - + workflow_call: + secrets: + self_hosted_github_token: + description: 'The GitHub token to use' + required: true + aws_access_key_id: + description: 'The AWS access key id to use' + required: true + aws_secret_access_key: + description: 'The AWS secret access key to use' + required: true + aws_region: + description: 'The AWS region to use' + required: true + inputs: + github_repo: + description: 'The GitHub repo to use' + required: true + type: string + ami_id: + description: 'The AMI ID to use for the EC2 instance' + required: true + type: string + instance_type: + description: 'The instance type to use for the EC2 instance' + required: true + type: string + model_server_image: + description: 'Kepler Model Server image' + required: true + type: string + push: + description: 'Push model to s3' + required: false + type: string + default: false + env: - AMI_ID: 'ami-0e4d0bb9670ea8db0' + AMI_ID: ${{ inputs.ami_id }} VERSION: 0.7 - AWS_REGION: ${{ secrets.AWS_REGION }} - AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} - AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + AWS_REGION: ${{ secrets.aws_region }} + AWS_ACCESS_KEY_ID: ${{ secrets.aws_access_key_id }} + AWS_SECRET_ACCESS_KEY: ${{ secrets.aws_secret_access_key }} KIND_CLUSTER_NAME: kind-for-training jobs: @@ -24,11 +54,6 @@ jobs: outputs: available: ${{ steps.check-data.outputs.available }} - strategy: - matrix: - instance_type: [i3.metal, c6i.metal] - max-parallel: 1 - steps: - name: Checkout id: checkout @@ -48,7 +73,7 @@ jobs: if [ "$AWS_SECRET_ACCESS_KEY" == "" ]; then echo "available=false" >> "$GITHUB_OUTPUT" else - if ./hack/aws_helper.sh check_data ${{ matrix.instance_type }}-${AMI_ID}; then + if ./hack/aws_helper.sh check_data ${{ inputs.instance_type }}-${AMI_ID}; then echo "available=true" >> "$GITHUB_OUTPUT" else echo "available=false" >> "$GITHUB_OUTPUT" @@ -60,10 +85,6 @@ jobs: if: ${{ needs.check-data.outputs.available == 'true' }} runs-on: ubuntu-latest - strategy: - matrix: - instance_type: [c6i.metal, i3.metal] - steps: - name: Checkout id: checkout @@ -85,7 +106,7 @@ jobs: - name: Load data run: | export HOST_MNT_PATH=$(pwd)/mnt - ./hack/aws_helper.sh load_data ${{ matrix.instance_type }}-${AMI_ID} + ./hack/aws_helper.sh load_data ${{ inputs.instance_type }}-${AMI_ID} docker exec --privileged "${KIND_CLUSTER_NAME}"-control-plane mkdir -p /mnt/data docker cp $(pwd)/mnt/data/idle.json "${KIND_CLUSTER_NAME}"-control-plane:/mnt/data/idle.json docker cp $(pwd)/mnt/data/kepler_query.json "${KIND_CLUSTER_NAME}"-control-plane:/mnt/data/kepler_query.json @@ -123,7 +144,8 @@ jobs: kubectl apply -f tasks/s3-pusher kubectl apply -f pipelines - - name: Run Tekton Pipeline + - name: Run Tekton Pipeline with S3Push + if: ${{ inputs.push == 'true' }} run: | cat < estimator run-estimator: - $(CTR_CMD) run -d --platform linux/amd64 -p 8100:8100 --name estimator $(TEST_IMAGE) python3.8 src/estimate/estimator.py + $(CTR_CMD) run -d --platform linux/amd64 -e "MODEL_TOPURL=http://localhost:8110" -v ${MODEL_PATH}:/mnt/models -p 8100:8100 --name estimator $(TEST_IMAGE) /bin/bash -c "python3.8 tests/http_server.py & sleep 5 && python3.8 src/estimate/estimator.py" run-collector-client: $(CTR_CMD) exec estimator /bin/bash -c "while [ ! -S "/tmp/estimator.sock" ]; do sleep 1; done; python3.8 -u ./tests/estimator_power_request_test.py" @@ -46,7 +48,7 @@ test-estimator: run-estimator run-collector-client clean-estimator # test estimator --> model-server run-model-server: - $(CTR_CMD) run -d --platform linux/amd64 -p 8100:8100 --name model-server $(TEST_IMAGE) python3.8 src/server/model_server.py + $(CTR_CMD) run -d --platform linux/amd64 -e "MODEL_TOPURL=http://localhost:8110" -v ${MODEL_PATH}:/mnt/models -p 8100:8100 --name model-server $(TEST_IMAGE) /bin/bash -c "python3.8 tests/http_server.py & sleep 5 && python3.8 src/server/model_server.py" sleep 5 run-estimator-client: diff --git a/cmd/cmd_util.py b/cmd/cmd_util.py index f3152713..c72f33a9 100644 --- a/cmd/cmd_util.py +++ b/cmd/cmd_util.py @@ -10,8 +10,8 @@ src_path = os.path.join(os.path.dirname(__file__), '..', 'src') sys.path.append(src_path) -from util.prom_types import node_info_column, prom_responses_to_results -from util.train_types import ModelOutputType, FeatureGroup +from util.prom_types import node_info_column, prom_responses_to_results, SOURCE_COL, energy_component_to_query +from util.train_types import ModelOutputType, FeatureGroup, PowerSourceMap from util.loader import load_json, get_pipeline_path from util.saver import assure_path, save_csv @@ -55,14 +55,15 @@ def summary_validation(validate_df): items = [] metric_to_validate_pod = { "cgroup": "kepler_container_cgroupfs_cpu_usage_us_total", - # "hwc": "kepler_container_cpu_instructions_total", - "hwc": "kepler_container_cpu_cycles_total", + # CPU instruction is mainly used for ratio. + # reference: https://github.com/sustainable-computing-io/kepler/blob/0b328cf7c79db9a11426fb80a1a922383e40197c/pkg/config/config.go#L92 + "hwc": "kepler_container_cpu_instructions_total", "kubelet": "kepler_container_kubelet_cpu_usage_total", "bpf": "kepler_container_bpf_cpu_time_us_total", } metric_to_validate_power = { - "rapl": "kepler_node_package_joules_total", - "platform": "kepler_node_platform_joules_total" + "intel_rapl": "kepler_node_package_joules_total", + "acpi": "kepler_node_platform_joules_total" } for metric, query in metric_to_validate_pod.items(): target_df = validate_df[validate_df["query"]==query] @@ -166,28 +167,31 @@ def get_validate_df(data_path, benchmark_filename, query_response): item["total"] = filtered_df[query].max() items += [item] energy_queries = [query for query in query_results.keys() if "_joules" in query] - for query in energy_queries: - df = query_results[query] - if len(df) == 0: - # set validate item // no value + for energy_source, energy_components in PowerSourceMap.items(): + for component in energy_components: + query = energy_component_to_query(component) + df = query_results[query] + df = df[df[SOURCE_COL] == energy_source] + if len(df) == 0: + # set validate item // no value + item = dict() + item["pod"] = "" + item["scenarioID"] = energy_source + item["query"] = query + item["count"] = 0 + item[">0"] = 0 + item["total"] = 0 + items += [item] + continue + # set validate item item = dict() item["pod"] = "" - item["scenarioID"] = "" + item["scenarioID"] = energy_source item["query"] = query - item["count"] = 0 - item[">0"] = 0 - item["total"] = 0 + item["count"] = len(df) + item[">0"] = len(df[df[query] > 0]) + item["total"] = df[query].max() items += [item] - continue - # set validate item - item = dict() - item["pod"] = "" - item["scenarioID"] = "" - item["query"] = query - item["count"] = len(df) - item[">0"] = len(df[df[query] > 0]) - item["total"] = df[query].max() - items += [item] other_queries = [query for query in query_results.keys() if query not in container_queries and query not in energy_queries] for query in other_queries: df = query_results[query] diff --git a/dockerfiles/Dockerfile.test b/dockerfiles/Dockerfile.test index 93791930..e6fb2db7 100644 --- a/dockerfiles/Dockerfile.test +++ b/dockerfiles/Dockerfile.test @@ -15,6 +15,8 @@ RUN mkdir -p tests/data COPY tests/data/prom_output tests/data/prom_output COPY tests/*.py tests/ +RUN mkdir -p /mnt/models + # port for Model Server EXPOSE 8100 # port for Online Trainer (TODO: reserved for event-based online training) diff --git a/manifests/test/file-server.yaml b/manifests/test/file-server.yaml new file mode 100644 index 00000000..98df4061 --- /dev/null +++ b/manifests/test/file-server.yaml @@ -0,0 +1,49 @@ +apiVersion: v1 +kind: Pod +metadata: + name: model-db + namespace: kepler + labels: + app.kubernetes.io/component: model-db +spec: + containers: + - name: file-server + image: localhost:5001/kepler_model_server:devel-test + imagePullPolicy: IfNotPresent + command: [ "/bin/sh" ] + args: ["-c", "python3.8 -u tests/http_server.py"] + ports: + - containerPort: 8110 + name: http + volumeMounts: + - name: mnt + mountPath: /mnt + initContainers: + - name: trainer + image: localhost:5001/kepler_model_server:devel-test + imagePullPolicy: IfNotPresent + command: [ "/bin/sh" ] + args: ["-c", "python3.8 -u tests/minimal_trainer.py"] + volumeMounts: + - name: mnt + mountPath: /mnt + # Add other init container configurations here + volumes: + - name: mnt + emptyDir: {} +--- +kind: Service +apiVersion: v1 +metadata: + name: model-db + namespace: kepler + labels: + app.kubernetes.io/component: model-db +spec: + clusterIP: None + selector: + app.kubernetes.io/component: model-db + ports: + - name: http + port: 8110 + targetPort: http \ No newline at end of file diff --git a/manifests/test/patch-estimator-sidecar.yaml b/manifests/test/patch-estimator-sidecar.yaml new file mode 100644 index 00000000..d93e9883 --- /dev/null +++ b/manifests/test/patch-estimator-sidecar.yaml @@ -0,0 +1,6 @@ +data: + MODEL_CONFIG: | + NODE_COMPONENTS_ESTIMATOR=true + NODE_COMPONENTS_INIT_URL=http://model-db.kepler.svc.cluster.local:8110/std_v0.7/intel_rapl/AbsPower/BPFOnly/GradientBoostingRegressorTrainer_1.zip + NODE_TOTAL_ESTIMATOR=true + NODE_TOTAL_INIT_URL=http://model-db.kepler.svc.cluster.local:8110/std_v0.7/acpi/AbsPower/BPFOnly/GradientBoostingRegressorTrainer_1.zip \ No newline at end of file diff --git a/model_training/s3-pusher/s3-pusher.py b/model_training/s3-pusher/s3-pusher.py index 8acd1546..222b5a28 100644 --- a/model_training/s3-pusher/s3-pusher.py +++ b/model_training/s3-pusher/s3-pusher.py @@ -28,11 +28,12 @@ def get_bucket_file_map(machine_id, mnt_path, query_data, idle_data): top_key_path = "" if machine_id is not None and machine_id != "": top_key_path = "/" + machine_id - for root, _, files in os.walk(model_path): - for file in files: - filepath = os.path.join(root, file) - key = filepath.replace(model_path, top_key_path + "/models") - bucket_file_map[key] = filepath + if os.path.exists(model_path): + for root, _, files in os.walk(model_path): + for file in files: + filepath = os.path.join(root, file) + key = filepath.replace(model_path, top_key_path + "/models") + bucket_file_map[key] = filepath data_path = os.path.join(mnt_path, data_dir) for data_filename in [query_data, idle_data]: if data_filename is not None: diff --git a/model_training/tekton/pipelines/collect.yaml b/model_training/tekton/pipelines/collect.yaml new file mode 100644 index 00000000..f78c55a2 --- /dev/null +++ b/model_training/tekton/pipelines/collect.yaml @@ -0,0 +1,179 @@ +############################################## +## +## collect-data-pipeline: +## +## - presteps (collect metrics at idle state and record start time) +## - run stressng workloads (tasks/stressng-task.yaml) +## - collect metrics (record end time and collect metrics when running stressng) +## - push to s3 +## +############################################## +apiVersion: tekton.dev/v1 +kind: Pipeline +metadata: + name: collect-data-pipeline +spec: + workspaces: + - name: mnt + description: Mount path + params: + - name: MODEL_SERVER_IMAGE + description: Specify model server image + default: quay.io/sustainable_computing_io/kepler_model_server:v0.7 + - name: IDLE_COLLECT_INTERVAL + description: Specify interval time to collect profile (idle) data before start the workload + default: 100 + - name: STRESS_BREAK_INTERVAL + description: Specify break interval between each stress load + default: 5 + - name: STRESS_TIMEOUT + description: Specify stress duration (timeout to stop stress) + default: 20 + - name: STRESS_ARGS + description: List arguments for CPU frequency and stressng workload (CPU_FREQUENCY;STRESS_LOAD;STRESS_INSTANCE_NUM;STRESS_EXTRA_PARAM_KEYS;STRESS_EXTRA_PARAM_VALS) + type: array + default: + - "cpu;none;none" + - "branch;none;none" + - "regs;none;none" + - "l1cache;none;none" + - "cache;none;none" + # - "stream;none;none" + # - "vm-rw;vm-rw-bytes;4G" + # - "sctp;none;none" + - name: THIRDPARTY_METRICS + description: Specify list of third party metric to export (required only for ThirdParty feature group) + default: "" + - name: COS_PROVIDER + description: Specify COS provider (supported choices are ibmcloud, aws) + default: "" + - name: COS_SECRET_NAME + description: Specify COS secret name + default: "" + - name: MACHINE_ID + description: Specify machine id to group model result in bucket + default: "" + tasks: + - name: presteps + params: + - name: IDLE_COLLECT_INTERVAL + value: $(params.IDLE_COLLECT_INTERVAL) + - name: THIRDPARTY_METRICS + value: $(params.THIRDPARTY_METRICS) + - name: MODEL_SERVER_IMAGE + value: $(params.MODEL_SERVER_IMAGE) + taskSpec: + workspaces: + - name: mnt + optional: true + params: + - name: IDLE_COLLECT_INTERVAL + - name: THIRDPARTY_METRICS + - name: MODEL_SERVER_IMAGE + results: + - name: stress-start-time + description: The time recorded before running the workload + steps: + - name: sleep + image: bash:5.2 + script: | + #!/usr/bin/env bash + sleep $(params.IDLE_COLLECT_INTERVAL) + - name: collect-idle + image: $(params.MODEL_SERVER_IMAGE) + args: + - cmd/main.py + - query + - --data-path=$(workspaces.mnt.path)/data + - --interval=$(params.IDLE_COLLECT_INTERVAL) + - --thirdparty-metrics="$(params.THIRDPARTY_METRICS)" + - --benchmark=idle + - -o=idle + command: ["python3.8"] + env: + - name: PROM_SERVER + value: http://prometheus-k8s.monitoring.svc:9090 + - name: record-start-time + image: bash:5.2 + script: | + #!/usr/bin/env bash + echo -n $(date +%Y-%m-%dT%H:%M:%SZ) > $(results.stress-start-time.path) + - name: run-stressng + runAfter: [presteps] + taskRef: + name: run-stressng + timeout: "5h" + params: + - name: INTERVAL + value: $(params.STRESS_BREAK_INTERVAL) + - name: TIMEOUT + value: $(params.STRESS_TIMEOUT) + - name: arguments + value: $(params.STRESS_ARGS[*]) + - name: collect-metric + runAfter: [run-stressng] + params: + - name: THIRDPARTY_METRICS + value: $(params.THIRDPARTY_METRICS) + - name: MODEL_SERVER_IMAGE + value: $(params.MODEL_SERVER_IMAGE) + taskSpec: + workspaces: + - name: mnt + optional: true + params: + - name: BENCHMARK + default: stressng + - name: THIRDPARTY_METRICS + - name: MODEL_SERVER_IMAGE + steps: + - name: collect-stressng + image: $(params.MODEL_SERVER_IMAGE) + args: + - cmd/main.py + - query + - --data-path=$(workspaces.mnt.path)/data + - --start-time=$(tasks.presteps.results.stress-start-time) + - --end-time=$(tasks.run-stressng.results.stress-end-time) + - --thirdparty-metrics="$(params.THIRDPARTY_METRICS)" + - --benchmark=stressng + - -o=kepler_query + command: ["python3.8"] + env: + - name: PROM_SERVER + value: http://prometheus-k8s.monitoring.svc:9090 + finally: + - name: ibmcloud-s3-push + when: + - input: "$(params.COS_PROVIDER)" + operator: in + values: ["ibmcloud"] + - input: "$(params.COS_SECRET_NAME)" + operator: notin + values: [""] + workspaces: + - name: mnt + params: + - name: COS_SECRET_NAME + value: $(params.COS_SECRET_NAME) + - name: MACHINE_ID + value: $(params.MACHINE_ID) + taskRef: + name: ibmcloud-s3-push + - name: aws-s3-push + when: + - input: "$(params.COS_PROVIDER)" + operator: in + values: ["aws"] + - input: "$(params.COS_SECRET_NAME)" + operator: notin + values: [""] + workspaces: + - name: mnt + params: + - name: COS_SECRET_NAME + value: $(params.COS_SECRET_NAME) + - name: MACHINE_ID + value: $(params.MACHINE_ID) + taskRef: + name: aws-s3-push \ No newline at end of file diff --git a/src/estimate/estimator.py b/src/estimate/estimator.py index 09b30d1c..fc464df6 100644 --- a/src/estimate/estimator.py +++ b/src/estimate/estimator.py @@ -54,12 +54,14 @@ def handle_request(data): return {"powers": dict(), "msg": msg} output_type = ModelOutputType[power_request.output_type] - energy_source = power_request.energy_source + # TODO: need revisit if get more than one rapl energy source + if power_request.energy_source is None or 'rapl' in power_request.energy_source: + power_request.energy_source = "intel_rapl" if output_type.name not in loaded_model: loaded_model[output_type.name] = dict() output_path = "" - if energy_source not in loaded_model[output_type.name]: + if power_request.energy_source not in loaded_model[output_type.name]: output_path = get_download_output_path(download_path, power_request.energy_source, output_type) if not os.path.exists(output_path): # try connecting to model server @@ -75,11 +77,11 @@ def handle_request(data): print("load model from config: ", output_path) else: print("load model from model server: ", output_path) - loaded_model[output_type.name][energy_source] = load_downloaded_model(power_request.energy_source, output_type) + loaded_model[output_type.name][power_request.energy_source] = load_downloaded_model(power_request.energy_source, output_type) # remove loaded model shutil.rmtree(output_path) - model = loaded_model[output_type.name][energy_source] + model = loaded_model[output_type.name][power_request.energy_source] powers, msg = model.get_power(power_request.datapoint) if msg != "": print("{} fail to predict, removed: {}".format(model.model_name, msg)) diff --git a/src/server/model_server.py b/src/server/model_server.py index df378e98..05282d58 100644 --- a/src/server/model_server.py +++ b/src/server/model_server.py @@ -14,7 +14,7 @@ from util.train_types import get_valid_feature_groups, ModelOutputType, FeatureGroups, FeatureGroup from util.config import getConfig, model_toppath, ERROR_KEY, MODEL_SERVER_MODEL_REQ_PATH, MODEL_SERVER_MODEL_LIST_PATH, initial_pipeline_url, download_path -from util.loader import parse_filters, is_valid_model, load_json, load_weight, get_model_group_path, get_archived_file, METADATA_FILENAME, CHECKPOINT_FOLDERNAME, get_pipeline_path, any_node_type, is_matched_type +from util.loader import parse_filters, is_valid_model, load_json, load_weight, get_model_group_path, get_archived_file, METADATA_FILENAME, CHECKPOINT_FOLDERNAME, get_pipeline_path, any_node_type, is_matched_type, lr_trainers ############################################### # model request @@ -47,6 +47,8 @@ def select_best_model(valid_groupath, filters, trainer_name="", node_type=any_no f != CHECKPOINT_FOLDERNAME \ and not os.path.isfile(os.path.join(valid_groupath,f)) \ and (trainer_name == "" or trainer_name in f)] + if weight: + model_names = [name for name in model_names if name.split("_")[0] in lr_trainers] # Load metadata of trainers best_cadidate = None best_response = None @@ -81,6 +83,11 @@ def get_model(): model_request = request.get_json() print("get request /model: {}".format(model_request)) req = ModelRequest(**model_request) + energy_source = req.source + # TODO: need revisit if get more than one rapl energy source + if energy_source is None or 'rapl' in energy_source: + energy_source = 'intel_rapl' + # find valid feature groups from available metrics in request valid_fgs = get_valid_feature_groups(req.metrics) # parse filtering conditions of metadata attribute from request if exists (e.g., minimum mae) @@ -90,7 +97,7 @@ def get_model(): best_response = None # find best model comparing best candidate from each valid feature group complied with filtering conditions for fg in valid_fgs: - valid_groupath = get_model_group_path(model_toppath, output_type, fg, req.source) + valid_groupath = get_model_group_path(model_toppath, output_type, fg, energy_source) if os.path.exists(valid_groupath): best_candidate, response = select_best_model(valid_groupath, filters, req.trainer_name, req.node_type, req.weight) if best_candidate is None: @@ -134,8 +141,8 @@ def get_available_models(): output_types = [ot for ot in ModelOutputType] else: output_types = [ModelOutputType[ot]] - - if energy_source is None: + # TODO: need revisit if get more than one rapl energy source + if energy_source is None or 'rapl' in energy_source: energy_source = 'intel_rapl' if filter is None: diff --git a/src/util/__init__.py b/src/util/__init__.py index 20f07372..c9339622 100644 --- a/src/util/__init__.py +++ b/src/util/__init__.py @@ -8,5 +8,5 @@ from saver import assure_path, save_csv, save_json, save_pkl, save_metadata, save_scaler, save_weight from config import getConfig, model_toppath from prom_types import get_valid_feature_group_from_queries -from train_types import SYSTEM_FEATURES, COUNTER_FEAUTRES, CGROUP_FEATURES, BPF_FEATURES, IRQ_FEATURES, KUBELET_FEATURES, WORKLOAD_FEATURES +from train_types import SYSTEM_FEATURES, COUNTER_FEAUTRES, CGROUP_FEATURES, BPF_FEATURES, IRQ_FEATURES, WORKLOAD_FEATURES from train_types import PowerSourceMap, FeatureGroup, FeatureGroups, ModelOutputType, get_feature_group \ No newline at end of file diff --git a/src/util/config.py b/src/util/config.py index 2bb0248f..c13a1e13 100644 --- a/src/util/config.py +++ b/src/util/config.py @@ -13,7 +13,7 @@ ################################################# import os -from loader import get_url, get_pipeline_url +from loader import get_url, get_pipeline_url, default_init_model_url from train_types import ModelOutputType, is_support_output_type # must be writable (for shared volume mount) @@ -63,7 +63,8 @@ def getPath(subpath): CONFIG_PATH = getConfig('CONFIG_PATH', CONFIG_PATH) -initial_pipeline_url = getConfig('INITIAL_PIPELINE_URL', get_pipeline_url()) +model_topurl = getConfig('MODEL_TOPURL', default_init_model_url) +initial_pipeline_url = getConfig('INITIAL_PIPELINE_URL', get_pipeline_url(model_topurl=model_topurl)) model_toppath = getConfig('MODEL_PATH', getPath(MODEL_FOLDERNAME)) download_path = getConfig('MODEL_PATH', getPath(DOWNLOAD_FOLDERNAME)) @@ -118,14 +119,14 @@ def get_energy_source(prefix): return DEFAULT_COMPONENTS_SOURCE # get_init_model_url: get initial model from URL if estimator is enabled -def get_init_model_url(energy_source, output_type): +def get_init_model_url(energy_source, output_type, model_topurl=model_topurl): for prefix in modelConfigPrefix: if get_energy_source(prefix) == energy_source: modelURL = get_init_url(prefix) print("get init url", modelURL) if modelURL == "" and is_support_output_type(output_type): print("init URL is not set, try using default URL".format(output_type)) - return get_url(output_type=ModelOutputType[output_type], energy_source=energy_source) + return get_url(output_type=ModelOutputType[output_type], energy_source=energy_source, model_topurl=model_topurl) else: return modelURL print("no match config for {}, {}".format(output_type, energy_source)) diff --git a/src/util/loader.py b/src/util/loader.py index 9aeca4fc..98577e2e 100644 --- a/src/util/loader.py +++ b/src/util/loader.py @@ -13,15 +13,13 @@ VALUE_DELIMIT = ':' ARRAY_DELIMIT = ',' -DEFAULT_PIPELINE = 'default' CHECKPOINT_FOLDERNAME = 'checkpoint' PREPROCESS_FOLDERNAME = "preprocessed_data" -# TODO: change to v0.7 when the model is updated to database +# TODO: change to v0.7 when the model is updated to database, need document update # default_init_model_url = "https://raw.githubusercontent.com/sustainable-computing-io/kepler-model-db/main/models/v0.7/nx12" -# default_init_pipeline_name = "std_v0.7" +DEFAULT_PIPELINE = "std_v0.7" default_init_model_url = "https://raw.githubusercontent.com/sustainable-computing-io/kepler-model-db/main/models/v0.6/nx12" -default_init_pipeline_name = "std_v0.6" default_trainer_name = "GradientBoostingRegressorTrainer" default_node_type = "1" any_node_type = -1 @@ -243,7 +241,7 @@ def get_download_output_path(download_path, energy_source, output_type): energy_source_path = assure_path(os.path.join(download_path, energy_source)) return os.path.join(energy_source_path, output_type.name) -def get_url(output_type, feature_group=default_feature_group, trainer_name=default_trainer_name, node_type=default_node_type, model_topurl=default_init_model_url, energy_source="intel_rapl", pipeline_name=default_init_pipeline_name, model_name=None, weight=False): +def get_url(output_type, feature_group=default_feature_group, trainer_name=default_trainer_name, node_type=default_node_type, model_topurl=default_init_model_url, energy_source="intel_rapl", pipeline_name=DEFAULT_PIPELINE, model_name=None, weight=False): group_path = get_model_group_path(model_topurl, output_type=output_type, feature_group=feature_group, energy_source=energy_source, pipeline_name=pipeline_name, assure=False) if model_name is None: model_name = get_model_name(trainer_name, node_type) @@ -252,7 +250,7 @@ def get_url(output_type, feature_group=default_feature_group, trainer_name=defau file_ext = ".json" return os.path.join(group_path, model_name + file_ext) -def get_pipeline_url(model_topurl=default_init_model_url, pipeline_name=default_init_pipeline_name, weight=False): +def get_pipeline_url(model_topurl=default_init_model_url, pipeline_name=DEFAULT_PIPELINE, weight=False): file_ext = ".zip" if weight: file_ext = ".json" diff --git a/src/util/train_types.py b/src/util/train_types.py index cd90c867..89d73264 100644 --- a/src/util/train_types.py +++ b/src/util/train_types.py @@ -18,10 +18,9 @@ CGROUP_FEATURES = ["cgroupfs_cpu_usage_us", "cgroupfs_memory_usage_bytes", "cgroupfs_system_cpu_usage_us", "cgroupfs_user_cpu_usage_us"] BPF_FEATURES = ["bpf_cpu_time_us", "bpf_page_cache_hit"] IRQ_FEATURES = ["bpf_block_irq", "bpf_net_rx_irq", "bpf_net_tx_irq"] -KUBELET_FEATURES =['kubelet_memory_bytes', 'kubelet_cpu_usage'] ACCELERATE_FEATURES = ['accelerator_intel_qat'] -WORKLOAD_FEATURES = COUNTER_FEAUTRES + CGROUP_FEATURES + BPF_FEATURES + IRQ_FEATURES + KUBELET_FEATURES + ACCELERATE_FEATURES -BASIC_FEATURES = COUNTER_FEAUTRES + CGROUP_FEATURES + BPF_FEATURES + KUBELET_FEATURES +WORKLOAD_FEATURES = COUNTER_FEAUTRES + CGROUP_FEATURES + BPF_FEATURES + IRQ_FEATURES + ACCELERATE_FEATURES +BASIC_FEATURES = COUNTER_FEAUTRES + CGROUP_FEATURES + BPF_FEATURES PowerSourceMap = { "intel_rapl": ["package", "core", "uncore", "dram"], @@ -44,13 +43,12 @@ class FeatureGroup(enum.Enum): CounterOnly = 3 CgroupOnly = 4 BPFOnly = 5 - KubeletOnly = 6 - IRQOnly = 7 - CounterIRQCombined = 8 - Basic = 9 - BPFIRQ = 10 - AcceleratorOnly = 11 - ThirdParty = 12 + IRQOnly = 6 + CounterIRQCombined = 7 + Basic = 8 + BPFIRQ = 9 + AcceleratorOnly = 10 + ThirdParty = 11 Unknown = 99 class EnergyComponentLabelGroup(enum.Enum): @@ -62,7 +60,6 @@ class EnergyComponentLabelGroup(enum.Enum): class ModelOutputType(enum.Enum): AbsPower = 1 DynPower = 2 - XGBoostStandalonePower = 3 def is_support_output_type(output_type_name): return any(output_type_name == item.name for item in ModelOutputType) @@ -78,14 +75,13 @@ def deep_sort(elements): FeatureGroup.CounterOnly: deep_sort(COUNTER_FEAUTRES), FeatureGroup.CgroupOnly: deep_sort(CGROUP_FEATURES), FeatureGroup.BPFOnly: deep_sort(BPF_FEATURES), - FeatureGroup.KubeletOnly: deep_sort(KUBELET_FEATURES), FeatureGroup.BPFIRQ: deep_sort(BPF_FEATURES + IRQ_FEATURES), FeatureGroup.CounterIRQCombined: deep_sort(COUNTER_FEAUTRES + IRQ_FEATURES), FeatureGroup.Basic: deep_sort(BASIC_FEATURES), FeatureGroup.AcceleratorOnly: deep_sort(ACCELERATE_FEATURES), } -SingleSourceFeatures = [FeatureGroup.CounterOnly.name, FeatureGroup.CgroupOnly.name, FeatureGroup.BPFOnly.name, FeatureGroup.BPFIRQ.name, FeatureGroup.KubeletOnly.name] +SingleSourceFeatures = [FeatureGroup.CounterOnly.name, FeatureGroup.CgroupOnly.name, FeatureGroup.BPFOnly.name, FeatureGroup.BPFIRQ.name] def is_single_source_feature_group(fg): return fg.name in SingleSourceFeatures @@ -96,7 +92,6 @@ def is_single_source_feature_group(fg): FeatureGroup.CounterOnly: "cpu_instructions", FeatureGroup.CgroupOnly: "cgroupfs_cpu_usage_us", FeatureGroup.BPFOnly: "bpf_cpu_time_us", - FeatureGroup.KubeletOnly: "kubelet_cpu_usage", FeatureGroup.BPFIRQ: "bpf_cpu_time_us", FeatureGroup.CounterIRQCombined: "cpu_instructions", FeatureGroup.Basic: "cpu_instructions", @@ -108,7 +103,6 @@ def is_single_source_feature_group(fg): FeatureGroup.CounterOnly: "cache_miss", FeatureGroup.CgroupOnly: "cgroupfs_memory_usage_bytes", FeatureGroup.BPFOnly: "bpf_page_cache_hit", - FeatureGroup.KubeletOnly: "kubelet_memory_bytes", FeatureGroup.BPFIRQ: "bpf_page_cache_hit", FeatureGroup.CounterIRQCombined: "cache_miss", FeatureGroup.Basic: "cache_miss" diff --git a/tests/e2e_test.sh b/tests/e2e_test.sh index 0702ca81..f6198280 100755 --- a/tests/e2e_test.sh +++ b/tests/e2e_test.sh @@ -43,6 +43,11 @@ get_server_log() { kubectl logs -n kepler $(get_component model-server) -c server-api } +get_db_log(){ + kubectl logs -n kepler model-db -c trainer + kubectl logs -n kepler model-db +} + wait_for_kepler() { kubectl rollout status ds kepler-exporter -n kepler --timeout 5m kubectl describe ds -n kepler kepler-exporter @@ -55,6 +60,13 @@ wait_for_server() { wait_for_keyword server "Press CTRL+C to quit" "server has not started yet" } +wait_for_db() { + kubectl get po model-db -n kepler + kubectl wait -n kepler --for=jsonpath='{.status.phase}'=Running pod/model-db --timeout 5m + wait_for_keyword db "Http File Serve Serving" "model-db is not serving" + get_db_log +} + wait_for_keyword() { num_iterations=10 component=$1 @@ -64,16 +76,23 @@ wait_for_keyword() { if grep -q "$keyword" <<< $(get_${component}_log); then return fi - kubectl get po -n kepler -oyaml sleep 2 done echo "timeout ${num_iterations}s waiting for '${keyword}' from ${component} log" echo "Error: $message" + kubectl get po -n kepler -oyaml + echo "${component} log:" get_${component}_log # show all status kubectl get po -A + if [ ! -z ${SERVER} ]; then + get_server_log + fi + if [ ! -z ${ESTIMATOR} ]; then + get_estimator_log + fi exit 1 } @@ -84,6 +103,7 @@ check_estimator_set_and_init() { restart_model_server() { kubectl delete po -l app.kubernetes.io/component=model-server -n kepler wait_for_server + get_server_log } test() { @@ -94,6 +114,22 @@ test() { for opt in ${DEPLOY_OPTIONS}; do export $opt=true; done; + # train and deploy local modelDB + kubectl apply -f ${top_dir}/manifests/test/file-server.yaml + sleep 10 + wait_for_db + + # patch MODEL_TOPURL environment + if [ ! -z ${ESTIMATOR} ]; then + kubectl patch configmap -n kepler kepler-cfm --type merge -p "$(cat ${top_dir}/manifests/test/patch-estimator-sidecar.yaml)" + kubectl patch ds kepler-exporter -n kepler -p '{"spec":{"template":{"spec":{"containers":[{"name":"estimator","env":[{"name":"MODEL_TOPURL","value":"http://model-db.kepler.svc.cluster.local:8110"}]}]}}}}' + fi + if [ ! -z ${SERVER} ]; then + kubectl patch deploy kepler-model-server -n kepler -p '{"spec":{"template":{"spec":{"containers":[{"name":"server-api","env":[{"name":"MODEL_TOPURL","value":"http://model-db.kepler.svc.cluster.local:8110"}]}]}}}}' + kubectl delete po -n kepler -l app.kubernetes.io/component=model-server + fi + kubectl delete po -n kepler -l app.kubernetes.io/component=exporter + if [ ! -z ${ESTIMATOR} ]; then # with estimator if [ ! -z ${TEST} ]; then diff --git a/tests/estimator_model_request_test.py b/tests/estimator_model_request_test.py index 3bf2dd6f..2c290c88 100644 --- a/tests/estimator_model_request_test.py +++ b/tests/estimator_model_request_test.py @@ -16,6 +16,12 @@ import os import sys +file_server_port = 8110 +# set environment +os.environ['MODEL_SERVER_URL'] = 'http://localhost:8100' +model_topurl = 'http://localhost:{}'.format(file_server_port) +os.environ['MODEL_TOPURL'] = model_topurl + server_path = os.path.join(os.path.dirname(__file__), '../src') util_path = os.path.join(os.path.dirname(__file__), '../src/util') train_path = os.path.join(os.path.dirname(__file__), '../src/train') @@ -28,6 +34,7 @@ sys.path.append(prom_path) sys.path.append(estimate_path) +from http_server import http_file_server from train_types import FeatureGroups, FeatureGroup, ModelOutputType from loader import get_download_output_path from estimate.estimator import handle_request, loaded_model, PowerRequest @@ -38,7 +45,7 @@ from estimator_power_request_test import generate_request -os.environ['MODEL_SERVER_URL'] = 'http://localhost:8100' +http_file_server(file_server_port) import json @@ -72,17 +79,17 @@ if url != "": print("Download: ", url) response = requests.get(url) - if response.status_code == 200: - output_path = get_download_output_path(download_path, energy_source, output_type) - if output_type_name in loaded_model and energy_source in loaded_model[output_type.name]: - del loaded_model[output_type_name][energy_source] - if os.path.exists(output_path): - shutil.rmtree(output_path) - request_json = generate_request(None, n=10, metrics=FeatureGroups[FeatureGroup.Full], output_type=output_type_name) - data = json.dumps(request_json) - output = handle_request(data) - assert len(output['powers']) > 0, "cannot get power {}\n {}".format(output['msg'], request_json) - print("result from {}: {}".format(url, output)) + assert response.status_code == 200, "init url must be set and valid" + output_path = get_download_output_path(download_path, energy_source, output_type) + if output_type_name in loaded_model and energy_source in loaded_model[output_type.name]: + del loaded_model[output_type_name][energy_source] + if os.path.exists(output_path): + shutil.rmtree(output_path) + request_json = generate_request(None, n=10, metrics=FeatureGroups[FeatureGroup.Full], output_type=output_type_name) + data = json.dumps(request_json) + output = handle_request(data) + assert len(output['powers']) > 0, "cannot get power {}\n {}".format(output['msg'], request_json) + print("result from {}: {}".format(url, output)) output_type_name = 'AbsPower' estimator_enable_key = "NODE_COMPONENTS_ESTIMATOR" @@ -99,23 +106,23 @@ if os.path.exists(output_path): shutil.rmtree(output_path) # valid model - os.environ[init_url_key] = get_url(output_type=output_type, feature_group=FeatureGroup.KubeletOnly) + os.environ[init_url_key] = get_url(output_type=output_type, feature_group=FeatureGroup.CgroupOnly, model_topurl=model_topurl) print("Requesting from ", os.environ[init_url_key]) - request_json = generate_request(None, n=10, metrics=FeatureGroups[FeatureGroup.KubeletOnly], output_type=output_type_name) + request_json = generate_request(None, n=10, metrics=FeatureGroups[FeatureGroup.CgroupOnly], output_type=output_type_name) data = json.dumps(request_json) output = handle_request(data) assert len(output['powers']) > 0, "cannot get power {}\n {}".format(output['msg'], request_json) - print("result {}/{} from static set: {}".format(output_type_name, FeatureGroup.KubeletOnly.name, output)) + print("result {}/{} from static set: {}".format(output_type_name, FeatureGroup.CgroupOnly.name, output)) del loaded_model[output_type_name][energy_source] # invalid model - os.environ[init_url_key] = get_url(output_type=output_type, feature_group=FeatureGroup.BPFOnly) + os.environ[init_url_key] = get_url(output_type=output_type, feature_group=FeatureGroup.BPFOnly, model_topurl=model_topurl) print("Requesting from ", os.environ[init_url_key]) - request_json = generate_request(None, n=10, metrics=FeatureGroups[FeatureGroup.KubeletOnly], output_type=output_type_name) + request_json = generate_request(None, n=10, metrics=FeatureGroups[FeatureGroup.CgroupOnly], output_type=output_type_name) data = json.dumps(request_json) power_request = json.loads(data, object_hook = lambda d : PowerRequest(**d)) output_path = get_achived_model(power_request) assert output_path is None, "model should be invalid\n {}".format(output_path) - os.environ['MODEL_CONFIG'] = "{}=true\n{}={}\n".format(estimator_enable_key,init_url_key,get_url(output_type=output_type, feature_group=FeatureGroup.KubeletOnly)) + os.environ['MODEL_CONFIG'] = "{}=true\n{}={}\n".format(estimator_enable_key,init_url_key,get_url(output_type=output_type, feature_group=FeatureGroup.CgroupOnly, model_topurl=model_topurl)) set_env_from_model_config() print("Requesting from ", os.environ[init_url_key]) reset_failed_list() @@ -124,7 +131,7 @@ output_path = get_download_output_path(download_path, energy_source, output_type) if os.path.exists(output_path): shutil.rmtree(output_path) - request_json = generate_request(None, n=10, metrics=FeatureGroups[FeatureGroup.KubeletOnly], output_type=output_type_name) + request_json = generate_request(None, n=10, metrics=FeatureGroups[FeatureGroup.CgroupOnly], output_type=output_type_name) data = json.dumps(request_json) output = handle_request(data) assert len(output['powers']) > 0, "cannot get power {}\n {}".format(output['msg'], request_json) \ No newline at end of file diff --git a/tests/http_server.py b/tests/http_server.py new file mode 100644 index 00000000..822ef56e --- /dev/null +++ b/tests/http_server.py @@ -0,0 +1,46 @@ +# import external src +import http.server +import socketserver +import atexit +import threading + +import os +import sys + +################################################################# +# import internal src +src_path = os.path.join(os.path.dirname(__file__), '..', 'src') +sys.path.append(src_path) +################################################################# + +from util.config import model_toppath + +os.chdir(model_toppath) + +def cleanup_task(server): + print("Shutdown server...") + server.shutdown() + +def get_server(file_server_port): + Handler = http.server.SimpleHTTPRequestHandler + httpd = socketserver.TCPServer(("", file_server_port), Handler) + + # Register the cleanup task to be executed on program exit + atexit.register(cleanup_task, httpd) + + print("Http File Serve Serving at Port", file_server_port, " for ", model_toppath) + return httpd + +def http_file_server(file_server_port): + try: + httpd = get_server(file_server_port) + # Start the server in a separate thread + server_thread = threading.Thread(target=httpd.serve_forever) + server_thread.daemon = True + server_thread.start() + except Exception as err: + print("File server is running: {}".format(err)) + +if __name__ == "__main__": + httpd = get_server(8110) + httpd.serve_forever() \ No newline at end of file diff --git a/tests/minimal_trainer.py b/tests/minimal_trainer.py new file mode 100644 index 00000000..cd2b2e70 --- /dev/null +++ b/tests/minimal_trainer.py @@ -0,0 +1,18 @@ +import os +import sys + +################################################################# +# import internal src +src_path = os.path.join(os.path.dirname(__file__), '..', 'src') +sys.path.append(src_path) +################################################################# + +from pipeline_test import process + +from util import PowerSourceMap, FeatureGroup + +trainer_names = [ 'GradientBoostingRegressorTrainer', 'SGDRegressorTrainer', 'XgboostFitTrainer' ] +valid_feature_groups = [ FeatureGroup.BPFOnly, FeatureGroup.CgroupOnly ] + +if __name__ == '__main__': + process(target_energy_sources=PowerSourceMap.keys(), abs_trainer_names=trainer_names, dyn_trainer_names=trainer_names, valid_feature_groups=valid_feature_groups) \ No newline at end of file diff --git a/tests/pipeline_test.py b/tests/pipeline_test.py index dcdd5a01..81eaded3 100644 --- a/tests/pipeline_test.py +++ b/tests/pipeline_test.py @@ -26,15 +26,21 @@ def assert_pipeline(pipeline, query_results, feature_group, energy_source, energ else: assert_train(trainer, dyn_data, energy_components) -def process(save_pipeline_name=DEFAULT_PIPELINE, prom_save_path=prom_output_path, prom_save_name=prom_output_filename, abs_trainer_names=test_trainer_names, dyn_trainer_names=test_trainer_names, extractors=test_extractors, isolators=test_isolators, target_energy_sources=[test_energy_source]): +def process(save_pipeline_name=DEFAULT_PIPELINE, prom_save_path=prom_output_path, prom_save_name=prom_output_filename, abs_trainer_names=test_trainer_names, dyn_trainer_names=test_trainer_names, extractors=test_extractors, isolators=test_isolators, target_energy_sources=[test_energy_source], valid_feature_groups=None): query_results = get_query_results(save_path=prom_save_path, save_name=prom_save_name) - valid_feature_groups = get_valid_feature_group_from_queries(query_results.keys()) + if valid_feature_groups is None: + valid_feature_groups = get_valid_feature_group_from_queries(query_results.keys()) for extractor in extractors: for isolator in isolators: - energy_components = PowerSourceMap[test_energy_source] pipeline = NewPipeline(save_pipeline_name, abs_trainer_names, dyn_trainer_names, extractor=extractor, isolator=isolator, target_energy_sources=target_energy_sources ,valid_feature_groups=valid_feature_groups) - for feature_group in valid_feature_groups: - assert_pipeline(pipeline, query_results, feature_group, test_energy_source, energy_components) + for energy_source in target_energy_sources: + energy_components = PowerSourceMap[energy_source] + for feature_group in valid_feature_groups: + assert_pipeline(pipeline, query_results, feature_group, energy_source, energy_components) + # save metadata + pipeline.save_metadata() + # save pipeline + pipeline.archive_pipeline() if __name__ == '__main__': - process() \ No newline at end of file + process(target_energy_sources=PowerSourceMap.keys()) \ No newline at end of file diff --git a/tests/trainer_test.py b/tests/trainer_test.py index bfea1007..5e8bc412 100644 --- a/tests/trainer_test.py +++ b/tests/trainer_test.py @@ -3,6 +3,8 @@ import os import sys +import sklearn + ################################################################# # import internal src src_path = os.path.join(os.path.dirname(__file__), '..', 'src') @@ -30,9 +32,12 @@ def assert_train(trainer, data, energy_components): node_type_filtered_data = data[data[node_info_column] == node_type] X_values = node_type_filtered_data[trainer.features].values for component in energy_components: - output = trainer.predict(node_type_str, component, X_values) - assert len(output) == len(X_values), "length of predicted values != features ({}!={})".format(len(output), len(X_values)) - + try: + output = trainer.predict(node_type_str, component, X_values) + assert len(output) == len(X_values), "length of predicted values != features ({}!={})".format(len(output), len(X_values)) + except sklearn.exceptions.NotFittedError: + pass + def process(node_level, feature_group, result, trainer_names=test_trainer_names, energy_source=test_energy_source, power_columns=get_expected_power_columns(), pipeline_name=DEFAULT_PIPELINE): energy_components = PowerSourceMap[energy_source] train_items = []