From 6abed54edd216925d31323628fe9a24bb8a3a639 Mon Sep 17 00:00:00 2001 From: Sunyanan Choochotkaew Date: Thu, 25 Jan 2024 19:27:29 +0900 Subject: [PATCH] add local db, remove kubelet Signed-off-by: Sunyanan Choochotkaew --- .github/workflows/build-push.yml | 135 ++++++++----- ...osted.yml => collect-data-self-hosted.yml} | 30 +-- .github/workflows/collect-train.yml | 40 ++++ ...gration-test.yaml => integration-test.yml} | 40 +++- .github/workflows/push-pr.yml | 180 +++++++++++++++++ .github/workflows/push-to-main.yml | 60 ++++++ .github/workflows/tekton-test.yml | 174 ++++++++++++++++ .github/workflows/train-model-on-release.yml | 44 ----- .../{retrain-model.yml => train-model.yml} | 120 ++++++++---- .github/workflows/train.yml | 44 +++++ .github/workflows/unit-test.yml | 39 ++++ .github/workflows/unittest.yaml | 42 ---- .gitignore | 4 +- Makefile | 10 +- cmd/cmd_util.py | 52 ++--- contributing.md | 20 ++ dockerfiles/Dockerfile.test | 2 + dockerfiles/requirements.txt | 3 +- hack/k8s_helper.sh | 2 +- manifests/test/file-server.yaml | 49 +++++ manifests/test/patch-estimator-sidecar.yaml | 6 + model_training/deployment/kepler.yaml | 2 +- model_training/{s3-pusher => s3}/Dockerfile | 4 +- model_training/{s3-pusher => s3}/README.md | 0 model_training/s3/s3-loader.py | 77 ++++++++ model_training/{s3-pusher => s3}/s3-pusher.py | 56 ++---- model_training/s3/util.py | 36 ++++ model_training/tekton/README.md | 12 +- .../complete-pipelinerun.yaml | 12 ++ .../examples/single-train/abs-power.yaml | 33 ++++ .../examples/single-train/aws-push.yaml | 39 ++++ .../single-train/default.yaml} | 4 +- .../examples/single-train/dyn-power.yaml | 33 ++++ .../examples/single-train/ibmcloud-push.yaml | 39 ++++ .../tekton/examples/test-collect.yaml | 30 +++ .../test-retrain.yaml} | 4 +- .../pipelineruns/dyn-train-pipelinerun.yaml | 19 -- model_training/tekton/pipelines/collect.yaml | 185 ++++++++++++++++++ .../tekton/pipelines/complete-retrain.yaml | 159 +++++++++++++++ .../tekton/pipelines/complete-train.yaml | 5 + .../{retrain.yaml => single-retrain.yaml} | 55 +++++- .../tekton/pipelines/single-train.yaml | 5 + model_training/tekton/push-taskrun.yaml | 16 -- .../tekton/tasks/s3/aws-s3-load.yaml | 58 ++++++ .../aws.yaml => s3/aws-s3-push.yaml} | 2 +- .../tekton/tasks/s3/ibmcloud-s3-load.yaml | 58 ++++++ .../ibmcloud-s3-push.yaml} | 2 +- .../tekton/tasks/stressng-task.yaml | 7 +- src/estimate/estimator.py | 10 +- src/server/model_server.py | 15 +- src/util/__init__.py | 2 +- src/util/config.py | 9 +- src/util/loader.py | 10 +- src/util/train_types.py | 24 +-- tests/e2e_test.sh | 38 +++- tests/estimator_model_request_test.py | 45 +++-- tests/http_server.py | 46 +++++ tests/minimal_trainer.py | 18 ++ tests/pipeline_test.py | 18 +- tests/trainer_test.py | 11 +- 60 files changed, 1907 insertions(+), 387 deletions(-) rename .github/workflows/{train-model-self-hosted.yml => collect-data-self-hosted.yml} (93%) create mode 100644 .github/workflows/collect-train.yml rename .github/workflows/{integration-test.yaml => integration-test.yml} (61%) create mode 100644 .github/workflows/push-pr.yml create mode 100644 .github/workflows/push-to-main.yml create mode 100644 .github/workflows/tekton-test.yml delete mode 100644 .github/workflows/train-model-on-release.yml rename .github/workflows/{retrain-model.yml => train-model.yml} (55%) create mode 100644 .github/workflows/train.yml create mode 100644 .github/workflows/unit-test.yml delete mode 100644 .github/workflows/unittest.yaml create mode 100644 manifests/test/file-server.yaml create mode 100644 manifests/test/patch-estimator-sidecar.yaml rename model_training/{s3-pusher => s3}/Dockerfile (53%) rename model_training/{s3-pusher => s3}/README.md (100%) create mode 100644 model_training/s3/s3-loader.py rename model_training/{s3-pusher => s3}/s3-pusher.py (51%) create mode 100644 model_training/s3/util.py rename model_training/tekton/{pipelineruns => examples}/complete-pipelinerun.yaml (55%) create mode 100644 model_training/tekton/examples/single-train/abs-power.yaml create mode 100644 model_training/tekton/examples/single-train/aws-push.yaml rename model_training/tekton/{pipelineruns/kepler-default.yaml => examples/single-train/default.yaml} (83%) create mode 100644 model_training/tekton/examples/single-train/dyn-power.yaml create mode 100644 model_training/tekton/examples/single-train/ibmcloud-push.yaml create mode 100644 model_training/tekton/examples/test-collect.yaml rename model_training/tekton/{pipelineruns/abs-train-pipelinerun.yaml => examples/test-retrain.yaml} (82%) delete mode 100644 model_training/tekton/pipelineruns/dyn-train-pipelinerun.yaml create mode 100644 model_training/tekton/pipelines/collect.yaml create mode 100644 model_training/tekton/pipelines/complete-retrain.yaml rename model_training/tekton/pipelines/{retrain.yaml => single-retrain.yaml} (80%) delete mode 100644 model_training/tekton/push-taskrun.yaml create mode 100644 model_training/tekton/tasks/s3/aws-s3-load.yaml rename model_training/tekton/tasks/{s3-pusher/aws.yaml => s3/aws-s3-push.yaml} (95%) create mode 100644 model_training/tekton/tasks/s3/ibmcloud-s3-load.yaml rename model_training/tekton/tasks/{s3-pusher/ibmcloud.yaml => s3/ibmcloud-s3-push.yaml} (95%) create mode 100644 tests/http_server.py create mode 100644 tests/minimal_trainer.py diff --git a/.github/workflows/build-push.yml b/.github/workflows/build-push.yml index 48826215..4d38573e 100644 --- a/.github/workflows/build-push.yml +++ b/.github/workflows/build-push.yml @@ -1,30 +1,79 @@ -name: BuildPushDeployImage +name: Build-Push Kepler Model Server Image on: - push: - branches: [ main ] + workflow_call: + secrets: + docker_username: + description: 'Docker username' + required: false + docker_password: + description: 'Docker password' + required: false + inputs: + base_change: + description: 'Change flag on base image' + required: true + type: string + image_repo: + description: 'The image repo to use' + required: true + type: string + image_tag: + description: 'The image tag to use' + required: true + type: string + push: + description: 'Push image' + required: false + type: string + default: false + + +env: + base_change: ${{ inputs.base_change }} + base_image: ${{ inputs.image_repo }}/kepler_model_server_base:${{ inputs.image_tag }} + image: ${{ inputs.image_repo }}/kepler_model_server:${{ inputs.image_tag }} jobs: - changes: + check-secret: runs-on: ubuntu-latest + outputs: - src: ${{ steps.changes.outputs.src }} + available: ${{ steps.check-secret.outputs.available }} + steps: - - uses: actions/checkout@v4 - - uses: dorny/paths-filter@v2 - id: changes - with: - filters: | - src: - - 'dockerfiles/requirements.txt' - - 'dockerfiles/Dockerfile.base' - - '.github/workflows/build-push.yml' + - name: Check Secret + id: check-secret + env: + SECRET: ${{ secrets.docker_password }} + run: | + if [ "$SECRET" == "" ]; then + echo "available=false" >> "$GITHUB_OUTPUT" + else + echo "available=true" >> "$GITHUB_OUTPUT" + fi + + + check-base-exist: + runs-on: ubuntu-latest - buildbase: - needs: changes - if: ${{ needs.changes.outputs.src == 'true' }} + outputs: + exists: ${{ steps.check-base-exist.outputs.exists }} + + steps: + - name: Check if Docker base image exists + id: check-base-exist + run: | + if docker pull ${{ env.base_image }}; then + echo "exists=true" >> "$GITHUB_OUTPUT" + else + echo "exists=false" >> "$GITHUB_OUTPUT" + fi + + build: runs-on: ubuntu-latest + needs: [check-secret, check-base-exist] steps: - name: checkout uses: actions/checkout@v4 @@ -32,53 +81,31 @@ jobs: uses: docker/setup-qemu-action@v3 - name: set up Docker Buildx uses: docker/setup-buildx-action@v3 - - name: Login to Quay - if: ${{ (github.repository_owner == 'sustainable-computing-io') && (github.ref == 'refs/heads/main') }} + - name: Login to Docker + if: ${{ needs.check-secret.outputs.available == 'true' }} uses: docker/login-action@v3 with: - registry: quay.io/sustainable_computing_io - username: ${{ secrets.BOT_NAME }} - password: ${{ secrets.BOT_TOKEN }} - - name: Build and push base image + registry: ${{ inputs.image_repo }} + username: ${{ secrets.docker_username }} + password: ${{ secrets.docker_password }} + - name: Build-push base image + if: ${{ (needs.check-secret.outputs.available == 'true') && ((needs.check-base-exist.outputs.exists == 'false') || (env.base_change == 'true')) }} uses: docker/build-push-action@v5 with: context: dockerfiles platforms: linux/amd64 push: true - tags: quay.io/sustainable_computing_io/kepler_model_server_base:v0.7 + tags: ${{ env.base_image }} file: dockerfiles/Dockerfile.base - - name: Build and push Docker image - uses: docker/build-push-action@v5 - with: - context: . - platforms: linux/amd64 - push: true - tags: quay.io/sustainable_computing_io/kepler_model_server:latest,quay.io/sustainable_computing_io/kepler_model_server:v0.7 - file: dockerfiles/Dockerfile - - build: - needs: [changes] - if: ${{ needs.changes.outputs.src == 'false' }} - runs-on: ubuntu-latest - steps: - - name: checkout - uses: actions/checkout@v4 - - name: set up QEMU - uses: docker/setup-qemu-action@v3 - - name: set up Docker Buildx - uses: docker/setup-buildx-action@v3 - - name: Login to Quay - if: ${{ (github.repository_owner == 'sustainable-computing-io') && (github.ref == 'refs/heads/main') }} - uses: docker/login-action@v3 - with: - registry: quay.io/sustainable_computing_io - username: ${{ secrets.BOT_NAME }} - password: ${{ secrets.BOT_TOKEN }} - - name: Build and push Docker image + - name: Replace value in file + if: ${{ (needs.check-secret.outputs.available == 'true') && ((needs.check-base-exist.outputs.exists == 'false') || (env.base_change == 'true')) }} + run: | + sed -i "s|quay.io/sustainable_computing_io/kepler_model_server_base:v0.7|${{ env.base_image }}|" dockerfiles/Dockerfile + - name: Build-push image uses: docker/build-push-action@v5 with: context: . platforms: linux/amd64 - push: true - tags: quay.io/sustainable_computing_io/kepler_model_server:latest,quay.io/sustainable_computing_io/kepler_model_server:v0.7 + push: ${{ inputs.push }} + tags: ${{ env.image }} file: dockerfiles/Dockerfile diff --git a/.github/workflows/train-model-self-hosted.yml b/.github/workflows/collect-data-self-hosted.yml similarity index 93% rename from .github/workflows/train-model-self-hosted.yml rename to .github/workflows/collect-data-self-hosted.yml index ccf5d4d9..2bdf0fe2 100644 --- a/.github/workflows/train-model-self-hosted.yml +++ b/.github/workflows/collect-data-self-hosted.yml @@ -1,4 +1,4 @@ -name: Reusable workflow example +name: Self-hosted Collect Data Workflow on: workflow_call: @@ -31,10 +31,13 @@ on: description: 'The instance type to use for the EC2 instance' required: true type: string + model_server_image: + description: 'Kepler Model Server image' + required: true + type: string env: KUBECONFIG: /root/.kube/config - VERSION: 0.7 jobs: setup-runner: @@ -69,8 +72,8 @@ jobs: echo "instance_ip ${{ steps.create-runner.outputs.instance_ip }}" echo "runner_name ${{ steps.create-runner.outputs.runner_name }}" whoami - train-power-model: - name: Train Power Model + collect-data: + name: Collect Data needs: setup-runner runs-on: [self-hosted, linux, x64] @@ -121,6 +124,7 @@ jobs: kubectl apply --filename https://storage.googleapis.com/tekton-releases/pipeline/latest/release.yaml ./hack/k8s_helper.sh rollout_ns_status tekton-pipelines ./hack/k8s_helper.sh rollout_ns_status tekton-pipelines-resolvers + - name: Prepare PVC working-directory: model_training/tekton run: | @@ -145,13 +149,13 @@ jobs: kubectl apply -f tasks kubectl apply -f tasks/s3-pusher kubectl apply -f pipelines - - name: Run Tekton Pipeline + - name: Run Tekton Pipeline with S3Push run: | cat <> "$GITHUB_OUTPUT" + else + if [ "$BRANCH" == "main" ]; then + echo "tag=v0.7" >> "$GITHUB_OUTPUT" + else + echo "tag=$COMMIT" >> "$GITHUB_OUTPUT" + fi + fi + + check-change: + runs-on: ubuntu-latest + + outputs: + base: ${{ steps.filter.outputs.base }} + data: ${{ steps.filter.outputs.data }} + modeling: ${{ steps.filter.outputs.modeling }} + + steps: + - uses: actions/checkout@v4 + - uses: dorny/paths-filter@v3 + id: filter + with: + filters: | + base: + - 'dockerfiles/requirements.txt' + - 'dockerfiles/Dockerfile.base' + - '.github/workflows/build-push.yml' + data: + - 'src/util/prom_types.py' + - 'src/util/train_types.py' + - 'src/train/prom/**' + - 'model_training/tekton/tasks/stressng-task.yaml' + - 'model_training/tekton/pipelines/collect.yaml' + - 'hack/**' + - '.github/workflows/collect-data-self-hosted.yml' + modeling: + - 'src/**' + - 'model_training/**' + - 'hack/**' + - '.github/workflows/collect-data-self-hosted.yml' + - '.github/workflows/train-model.yml' + + check-secret: + runs-on: ubuntu-latest + + outputs: + docker-secret: ${{ steps.check-docker-secret.outputs.available }} + + steps: + - name: Check Docker Secret + id: check-docker-secret + env: + DOCKER_SECRET: ${{ secrets.BOT_TOKEN}} + run: | + if [ "$DOCKER_SECRET" == "" ]; then + echo "available=false" >> "$GITHUB_OUTPUT" + else + echo "available=true" >> "$GITHUB_OUTPUT" + fi + + check-base-exist: + runs-on: ubuntu-latest + + outputs: + exists: ${{ steps.check-base-exist.outputs.exists }} + + steps: + - name: Check if Docker base image exists + id: check-base-exist + run: | + if docker pull ${{ env.BASE_IMAGE }}; then + echo "exists=true" >> "$GITHUB_OUTPUT" + else + echo "exists=false" >> "$GITHUB_OUTPUT" + fi + + unit-test: + needs: [check-change] + uses: ./.github/workflows/unit-test.yml + with: + base_change: ${{ needs.check-change.outputs.base }} + + base-image: + if: ${{ (needs.check-secret.outputs.docker-secret == 'true') && ((needs.check-base-exist.outputs.exists == 'false') || (needs.check-change.outputs.base == 'true')) }} + needs: [check-base-exist, check-branch, check-secret, check-change] + runs-on: ubuntu-latest + + outputs: + change: ${{ steps.record.outputs.change }} + + env: + tag: ${{ needs.check-branch.outputs.tag }} + + steps: + - name: checkout + uses: actions/checkout@v4 + - name: set up QEMU + uses: docker/setup-qemu-action@v3 + - name: set up Docker Buildx + uses: docker/setup-buildx-action@v3 + - name: Login to Docker + uses: docker/login-action@v3 + with: + registry: ${{ vars.IMAGE_REPO || 'docker.io/library' }} + username: ${{ secrets.BOT_NAME }} + password: ${{ secrets.BOT_TOKEN }} + - name: Build-push base image + uses: docker/build-push-action@v5 + with: + context: dockerfiles + push: true + tags: ${{ vars.IMAGE_REPO || 'docker.io/library' }}/kepler_model_server_base:${{ needs.check-branch.outputs.tag }} + file: dockerfiles/Dockerfile.base + - name: Record change + id: record + run: | + echo "change=true" >> "$GITHUB_OUTPUT" + + tekton-test: + needs: [check-branch, base-image] + # the changed base-image has been pushed or no change + if: ${{ needs.base-image.outputs.change == 'true' || needs.check-change.outputs.base != 'true' }} + uses: ./.github/workflows/tekton-test.yml + with: + base_change: ${{ needs.base-image.outputs.change }} + image_repo: ${{ vars.IMAGE_REPO || 'docker.io/library' }} + image_tag: ${{ needs.check-branch.outputs.tag }} + pipeline_name: std_v0.7 + secrets: + docker_username: ${{ secrets.BOT_NAME }} + docker_password: ${{ secrets.BOT_TOKEN }} + + integration-test: + needs: [check-branch, base-image] + # the changed base-image has been pushed or no change + if: ${{ needs.base-image.outputs.change == 'true' || needs.check-change.outputs.base != 'true' }} + uses: ./.github/workflows/integration-test.yml + with: + base_change: ${{ needs.base-image.outputs.change }} + image_repo: ${{ vars.IMAGE_REPO || 'docker.io/library' }} + image_tag: ${{ needs.check-branch.outputs.tag }} + secrets: + docker_username: ${{ secrets.BOT_NAME }} + docker_password: ${{ secrets.BOT_TOKEN }} \ No newline at end of file diff --git a/.github/workflows/push-to-main.yml b/.github/workflows/push-to-main.yml new file mode 100644 index 00000000..510b2295 --- /dev/null +++ b/.github/workflows/push-to-main.yml @@ -0,0 +1,60 @@ +on: + push: + branches: + - main + +jobs: + check-change: + runs-on: ubuntu-latest + + outputs: + base: ${{ steps.filter.outputs.base }} + modeling: ${{ steps.filter.outputs.modeling }} + + steps: + - uses: actions/checkout@v4 + - uses: dorny/paths-filter@v3 + id: filter + with: + filters: | + base: + - 'dockerfiles/requirements.txt' + - 'dockerfiles/Dockerfile.base' + - '.github/workflows/build-push.yml' + modeling: + - 'src/**' + - 'model_training/**' + - 'hack/**' + - '.github/workflows/train-model.yml' + + build-push: + needs: [check-change] + uses: ./.github/workflows/build-push.yml + with: + base_change: ${{ needs.check-change.outputs.base }} + image_repo: ${{ vars.IMAGE_REPO }} + image_tag: v0.7 + push: true + secrets: + docker_username: ${{ secrets.BOT_NAME }} + docker_password: ${{ secrets.BOT_TOKEN }} + + train-model: + needs: [check-change, build-push] + if: ${{ needs.check-change.outputs.modeling == 'true' }} + strategy: + matrix: + instance_type: [i3.metal] + uses: ./.github/workflows/train-model.yml + with: + pipeline_name: std_v0.7 + instance_type: ${{ matrix.instance_type }} + ami_id: 'ami-0e4d0bb9670ea8db0' + github_repo: ${{ github.repository }} + model_server_image: ${{ vars.IMAGE_REPO }}/kepler-model-server:v0.7 + trainers: LogisticRegressionTrainer,ExponentialRegressionTrainer,SGDRegressorTrainer,GradientBoostingRegressorTrainer,XgboostFitTrainer + secrets: + self_hosted_github_token: ${{ secrets.GH_SELF_HOSTED_RUNNER_TOKEN }} + aws_access_key_id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws_secret_access_key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + aws_region: ${{ secrets.AWS_REGION }} \ No newline at end of file diff --git a/.github/workflows/tekton-test.yml b/.github/workflows/tekton-test.yml new file mode 100644 index 00000000..f137d1de --- /dev/null +++ b/.github/workflows/tekton-test.yml @@ -0,0 +1,174 @@ +name: Tekton Test + +on: + workflow_call: + secrets: + docker_username: + description: 'Docker username' + required: false + docker_password: + description: 'Docker password' + required: false + inputs: + base_change: + description: 'Change flag on base image' + required: true + type: string + image_repo: + description: 'The image repo to use' + required: true + type: string + image_tag: + description: 'The image tag to use' + required: true + type: string + pipeline_name: + description: 'Pipeline name' + required: true + type: string + output_type: + description: 'Model output type (AbsPower, DynPower)' + required: false + type: string + default: AbsPower + +env: + BASE_IMAGE: ${{ inputs.image_repo }}/kepler_model_server_base:${{ inputs.image_tag }} + IMAGE: ${{ inputs.image_repo }}/kepler_model_server:${{ inputs.image_tag }} + +jobs: + tekton-test: + runs-on: ubuntu-latest + steps: + - name: checkout + uses: actions/checkout@v4 + - name: set up QEMU + uses: docker/setup-qemu-action@v3 + - name: set up Docker Buildx + uses: docker/setup-buildx-action@v3 + - name: Login to Docker + if: ${{ inputs.base_change == 'true' }} + uses: docker/login-action@v3 + with: + registry: ${{ inputs.image_repo }} + username: ${{ secrets.docker_username }} + password: ${{ secrets.docker_password }} + - name: Replace value in file + if: ${{ inputs.base_change == 'true' }} + run: | + sed -i "s|quay.io/sustainable_computing_io/kepler_model_server_base:v0.7|${{ env.BASE_IMAGE }}|" dockerfiles/Dockerfile + + - name: Build image + run: | + docker build -t $IMAGE -f dockerfiles/Dockerfile . + + - name: Prepare Cluster + working-directory: model_training + run: | + ./script.sh cluster_up + cp $HOME/bin/kubectl /usr/local/bin/kubectl + kubectl get po -A + + - name: Load built image to cluster + run: | + kind load docker-image $IMAGE --name=kind-for-training + + - name: Install Kepler + working-directory: model_training + run: | + ./script.sh deploy_kepler + ./script.sh deploy_prom_dependency + podname=$(kubectl get pods -oname -nkepler) + echo $podname + kubectl get $podname -n kepler -oyaml + kubectl logs $podname -n kepler + kubectl get $podname -n kepler + + - name: Install Tekton + run: | + kubectl apply --filename https://storage.googleapis.com/tekton-releases/pipeline/latest/release.yaml + ./hack/k8s_helper.sh rollout_ns_status tekton-pipelines + ./hack/k8s_helper.sh rollout_ns_status tekton-pipelines-resolvers + + - name: Prepare PVC + working-directory: model_training/tekton + run: | + kubectl apply -f pvc/hostpath.yaml + + - name: Deploy Tasks and Pipelines + working-directory: model_training/tekton + run: | + kubectl apply -f tasks + kubectl apply -f tasks/s3 + kubectl apply -f pipelines + + - name: Run Tekton Collect + run: | + cat <> "$GITHUB_OUTPUT" - else - echo "available=true" >> "$GITHUB_OUTPUT" - fi - - train-model: - if: ${{ needs.check-secret.outputs.available == 'true' }} - strategy: - matrix: - instance_type: [i3.metal, c6i.metal] - max-parallel: 1 - - uses: ./.github/workflows/train-model-self-hosted.yml - with: - instance_type: ${{ matrix.instance_type }} - ami_id: 'ami-0e4d0bb9670ea8db0' - github_repo: ${{ github.repository }} - secrets: - self_hosted_github_token: ${{ secrets.GH_SELF_HOSTED_RUNNER_TOKEN }} - aws_access_key_id: ${{ secrets.AWS_ACCESS_KEY_ID }} - aws_secret_access_key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} - security_group_id: ${{ secrets.AWS_SECURITY_GROUP_ID }} - aws_region: ${{ secrets.AWS_REGION }} diff --git a/.github/workflows/retrain-model.yml b/.github/workflows/train-model.yml similarity index 55% rename from .github/workflows/retrain-model.yml rename to .github/workflows/train-model.yml index e18ff08b..8c0d2615 100644 --- a/.github/workflows/retrain-model.yml +++ b/.github/workflows/train-model.yml @@ -1,20 +1,68 @@ -name: Retrain Power on GitHub Runner +name: Train Power Model on GitHub Runner on: - push: - paths: - - src - - hack - - model_training - - .github/workflows/retrain-model.yml + workflow_call: + secrets: + self_hosted_github_token: + description: 'The GitHub token to use' + required: true + aws_access_key_id: + description: 'The AWS access key id to use' + required: true + aws_secret_access_key: + description: 'The AWS secret access key to use' + required: true + aws_region: + description: 'The AWS region to use' + required: true + inputs: + pipeline_name: + description: 'Pipeline name' + required: true + type: string + github_repo: + description: 'The GitHub repo to use' + required: true + type: string + ami_id: + description: 'The AMI ID to use for the EC2 instance' + required: true + type: string + instance_type: + description: 'The instance type to use for the EC2 instance' + required: true + type: string + model_server_image: + description: 'Kepler Model Server image' + required: true + type: string + output_type: + description: 'Model output type (AbsPower, DynPower)' + required: false + type: string + default: AbsPower + energy_source: + description: 'Target energy source' + required: false + type: string + default: intel_rapl + feature_group: + description: 'Target feature group' + required: false + type: string + default: BPFOnly + trainers: + description: 'Model trainer list (delimit: ,)' + required: false + type: string + default: XgboostFitTrainer env: - AMI_ID: 'ami-0e4d0bb9670ea8db0' - VERSION: 0.7 - AWS_REGION: ${{ secrets.AWS_REGION }} - AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} - AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} - KIND_CLUSTER_NAME: kind-for-training + AMI_ID: ${{ inputs.ami_id }} + AWS_REGION: ${{ secrets.aws_region }} + AWS_ACCESS_KEY_ID: ${{ secrets.aws_access_key_id }} + AWS_SECRET_ACCESS_KEY: ${{ secrets.aws_secret_access_key }} + KIND_CLUSTER_NAME: kind-for-training jobs: check-data: @@ -24,11 +72,6 @@ jobs: outputs: available: ${{ steps.check-data.outputs.available }} - strategy: - matrix: - instance_type: [i3.metal, c6i.metal] - max-parallel: 1 - steps: - name: Checkout id: checkout @@ -48,7 +91,7 @@ jobs: if [ "$AWS_SECRET_ACCESS_KEY" == "" ]; then echo "available=false" >> "$GITHUB_OUTPUT" else - if ./hack/aws_helper.sh check_data ${{ matrix.instance_type }}-${AMI_ID}; then + if ./hack/aws_helper.sh check_data ${{ inputs.instance_type }}-${AMI_ID}; then echo "available=true" >> "$GITHUB_OUTPUT" else echo "available=false" >> "$GITHUB_OUTPUT" @@ -57,13 +100,9 @@ jobs: retrain-model: needs: check-data - if: ${{ needs.check-data.outputs.available == 'true' }} + if: ${{ needs.check-data.outputs.available == 'true' }} runs-on: ubuntu-latest - strategy: - matrix: - instance_type: [c6i.metal, i3.metal] - steps: - name: Checkout id: checkout @@ -82,14 +121,6 @@ jobs: cp $HOME/bin/kubectl /usr/local/bin/kubectl kubectl get po -A - - name: Load data - run: | - export HOST_MNT_PATH=$(pwd)/mnt - ./hack/aws_helper.sh load_data ${{ matrix.instance_type }}-${AMI_ID} - docker exec --privileged "${KIND_CLUSTER_NAME}"-control-plane mkdir -p /mnt/data - docker cp $(pwd)/mnt/data/idle.json "${KIND_CLUSTER_NAME}"-control-plane:/mnt/data/idle.json - docker cp $(pwd)/mnt/data/kepler_query.json "${KIND_CLUSTER_NAME}"-control-plane:/mnt/data/kepler_query.json - - name: Install Tekton run: | kubectl apply --filename https://storage.googleapis.com/tekton-releases/pipeline/latest/release.yaml @@ -120,16 +151,16 @@ jobs: working-directory: model_training/tekton run: | kubectl apply -f tasks - kubectl apply -f tasks/s3-pusher + kubectl apply -f tasks/s3 kubectl apply -f pipelines - - name: Run Tekton Pipeline + - name: Run Tekton Pipeline with S3Push run: | cat < estimator run-estimator: - $(CTR_CMD) run -d --platform linux/amd64 -p 8100:8100 --name estimator $(TEST_IMAGE) python3.8 src/estimate/estimator.py + $(CTR_CMD) run -d --platform linux/amd64 -e "MODEL_TOPURL=http://localhost:8110" -v ${MODEL_PATH}:/mnt/models -p 8100:8100 --name estimator $(TEST_IMAGE) /bin/bash -c "python3.8 tests/http_server.py & sleep 5 && python3.8 src/estimate/estimator.py" run-collector-client: $(CTR_CMD) exec estimator /bin/bash -c "while [ ! -S "/tmp/estimator.sock" ]; do sleep 1; done; python3.8 -u ./tests/estimator_power_request_test.py" @@ -46,7 +48,7 @@ test-estimator: run-estimator run-collector-client clean-estimator # test estimator --> model-server run-model-server: - $(CTR_CMD) run -d --platform linux/amd64 -p 8100:8100 --name model-server $(TEST_IMAGE) python3.8 src/server/model_server.py + $(CTR_CMD) run -d --platform linux/amd64 -e "MODEL_TOPURL=http://localhost:8110" -v ${MODEL_PATH}:/mnt/models -p 8100:8100 --name model-server $(TEST_IMAGE) /bin/bash -c "python3.8 tests/http_server.py & sleep 5 && python3.8 src/server/model_server.py" sleep 5 run-estimator-client: diff --git a/cmd/cmd_util.py b/cmd/cmd_util.py index f3152713..c72f33a9 100644 --- a/cmd/cmd_util.py +++ b/cmd/cmd_util.py @@ -10,8 +10,8 @@ src_path = os.path.join(os.path.dirname(__file__), '..', 'src') sys.path.append(src_path) -from util.prom_types import node_info_column, prom_responses_to_results -from util.train_types import ModelOutputType, FeatureGroup +from util.prom_types import node_info_column, prom_responses_to_results, SOURCE_COL, energy_component_to_query +from util.train_types import ModelOutputType, FeatureGroup, PowerSourceMap from util.loader import load_json, get_pipeline_path from util.saver import assure_path, save_csv @@ -55,14 +55,15 @@ def summary_validation(validate_df): items = [] metric_to_validate_pod = { "cgroup": "kepler_container_cgroupfs_cpu_usage_us_total", - # "hwc": "kepler_container_cpu_instructions_total", - "hwc": "kepler_container_cpu_cycles_total", + # CPU instruction is mainly used for ratio. + # reference: https://github.com/sustainable-computing-io/kepler/blob/0b328cf7c79db9a11426fb80a1a922383e40197c/pkg/config/config.go#L92 + "hwc": "kepler_container_cpu_instructions_total", "kubelet": "kepler_container_kubelet_cpu_usage_total", "bpf": "kepler_container_bpf_cpu_time_us_total", } metric_to_validate_power = { - "rapl": "kepler_node_package_joules_total", - "platform": "kepler_node_platform_joules_total" + "intel_rapl": "kepler_node_package_joules_total", + "acpi": "kepler_node_platform_joules_total" } for metric, query in metric_to_validate_pod.items(): target_df = validate_df[validate_df["query"]==query] @@ -166,28 +167,31 @@ def get_validate_df(data_path, benchmark_filename, query_response): item["total"] = filtered_df[query].max() items += [item] energy_queries = [query for query in query_results.keys() if "_joules" in query] - for query in energy_queries: - df = query_results[query] - if len(df) == 0: - # set validate item // no value + for energy_source, energy_components in PowerSourceMap.items(): + for component in energy_components: + query = energy_component_to_query(component) + df = query_results[query] + df = df[df[SOURCE_COL] == energy_source] + if len(df) == 0: + # set validate item // no value + item = dict() + item["pod"] = "" + item["scenarioID"] = energy_source + item["query"] = query + item["count"] = 0 + item[">0"] = 0 + item["total"] = 0 + items += [item] + continue + # set validate item item = dict() item["pod"] = "" - item["scenarioID"] = "" + item["scenarioID"] = energy_source item["query"] = query - item["count"] = 0 - item[">0"] = 0 - item["total"] = 0 + item["count"] = len(df) + item[">0"] = len(df[df[query] > 0]) + item["total"] = df[query].max() items += [item] - continue - # set validate item - item = dict() - item["pod"] = "" - item["scenarioID"] = "" - item["query"] = query - item["count"] = len(df) - item[">0"] = len(df[df[query] > 0]) - item["total"] = df[query].max() - items += [item] other_queries = [query for query in query_results.keys() if query not in container_queries and query not in energy_queries] for query in other_queries: df = query_results[query] diff --git a/contributing.md b/contributing.md index fe1c5791..014c2b48 100644 --- a/contributing.md +++ b/contributing.md @@ -3,6 +3,18 @@ - The main source codes are in [src directory](./src/). +## PR Hands-on + +- Create related [issue](https://github.com/sustainable-computing-io/kepler-model-server/issues) with your name assigned first (if not exist). + +- Set required secret and environment for local repository test if needed. Check below table. + +Objective|Required Secret|Required Environment +---|---|--- +Push to private repo|BOT_NAME, BOT_TOKEN|IMAGE_REPO +Change on base image|BOT_NAME, BOT_TOKEN|IMAGE_REPO +Save data/models to AWS COS|AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY,AWS_REGION| + ## Improve components in training pipelines Learn more details about [Training Pipeline](https://sustainable-computing.io/kepler_model_server/pipeline/) @@ -26,8 +38,12 @@ Learn more details about [model training](./model_training/) The new benchmark must be supported by [CPE operator](https://github.com/IBM/cpe-operator) for automation. Find [examples](https://github.com/IBM/cpe-operator/tree/main/examples). +### CPE-based (deprecated) `Benchmark` CR has a dependency on `BenchmarkOperator`. Default `BechmarkOperator` is to support [batch/v1/Job API](https://github.com/IBM/cpe-operator/blob/main/examples/none/cpe_v1_none_operator.yaml). +### Tekton +Create workload `Task` and provide example `Pipeline` to run. + ### Add new trained models TBD @@ -36,3 +52,7 @@ Any improvement in `src` and `cmd`. ## Test and CI improvement Any improvement in `tests`, `dockerfiles`, `manifests` and `.github/workflows` + +## Documentation + +Detailed documentation should be posted to [kepler-doc](https://github.com/sustainable-computing-io/kepler-doc) repository. \ No newline at end of file diff --git a/dockerfiles/Dockerfile.test b/dockerfiles/Dockerfile.test index 93791930..e6fb2db7 100644 --- a/dockerfiles/Dockerfile.test +++ b/dockerfiles/Dockerfile.test @@ -15,6 +15,8 @@ RUN mkdir -p tests/data COPY tests/data/prom_output tests/data/prom_output COPY tests/*.py tests/ +RUN mkdir -p /mnt/models + # port for Model Server EXPOSE 8100 # port for Online Trainer (TODO: reserved for event-based online training) diff --git a/dockerfiles/requirements.txt b/dockerfiles/requirements.txt index 96f908b4..ac78f4df 100644 --- a/dockerfiles/requirements.txt +++ b/dockerfiles/requirements.txt @@ -11,4 +11,5 @@ scipy==1.9.1 xgboost==2.0.1 scikit-learn==1.1.2 py-cpuinfo==9.0.0 -seaborn==0.12.2 \ No newline at end of file +seaborn==0.12.2 +psutil==5.9.8 \ No newline at end of file diff --git a/hack/k8s_helper.sh b/hack/k8s_helper.sh index 29251dfe..75cc5427 100755 --- a/hack/k8s_helper.sh +++ b/hack/k8s_helper.sh @@ -72,7 +72,7 @@ wait_for_pipelinerun() { name=$1 namespace=default - if kubectl get taskruns|grep run-stressng; then + if kubectl get taskruns|grep ${name}-run-stressng; then value=$(_get_succeed_condition $resource $name $namespace) while [ "$value" == "Unknown" ] ; do diff --git a/manifests/test/file-server.yaml b/manifests/test/file-server.yaml new file mode 100644 index 00000000..98df4061 --- /dev/null +++ b/manifests/test/file-server.yaml @@ -0,0 +1,49 @@ +apiVersion: v1 +kind: Pod +metadata: + name: model-db + namespace: kepler + labels: + app.kubernetes.io/component: model-db +spec: + containers: + - name: file-server + image: localhost:5001/kepler_model_server:devel-test + imagePullPolicy: IfNotPresent + command: [ "/bin/sh" ] + args: ["-c", "python3.8 -u tests/http_server.py"] + ports: + - containerPort: 8110 + name: http + volumeMounts: + - name: mnt + mountPath: /mnt + initContainers: + - name: trainer + image: localhost:5001/kepler_model_server:devel-test + imagePullPolicy: IfNotPresent + command: [ "/bin/sh" ] + args: ["-c", "python3.8 -u tests/minimal_trainer.py"] + volumeMounts: + - name: mnt + mountPath: /mnt + # Add other init container configurations here + volumes: + - name: mnt + emptyDir: {} +--- +kind: Service +apiVersion: v1 +metadata: + name: model-db + namespace: kepler + labels: + app.kubernetes.io/component: model-db +spec: + clusterIP: None + selector: + app.kubernetes.io/component: model-db + ports: + - name: http + port: 8110 + targetPort: http \ No newline at end of file diff --git a/manifests/test/patch-estimator-sidecar.yaml b/manifests/test/patch-estimator-sidecar.yaml new file mode 100644 index 00000000..d93e9883 --- /dev/null +++ b/manifests/test/patch-estimator-sidecar.yaml @@ -0,0 +1,6 @@ +data: + MODEL_CONFIG: | + NODE_COMPONENTS_ESTIMATOR=true + NODE_COMPONENTS_INIT_URL=http://model-db.kepler.svc.cluster.local:8110/std_v0.7/intel_rapl/AbsPower/BPFOnly/GradientBoostingRegressorTrainer_1.zip + NODE_TOTAL_ESTIMATOR=true + NODE_TOTAL_INIT_URL=http://model-db.kepler.svc.cluster.local:8110/std_v0.7/acpi/AbsPower/BPFOnly/GradientBoostingRegressorTrainer_1.zip \ No newline at end of file diff --git a/model_training/deployment/kepler.yaml b/model_training/deployment/kepler.yaml index e1693fcc..d89bf6cf 100644 --- a/model_training/deployment/kepler.yaml +++ b/model_training/deployment/kepler.yaml @@ -204,7 +204,7 @@ spec: valueFrom: fieldRef: fieldPath: spec.nodeName - image: quay.io/sustainable_computing_io/kepler:release-0.7.1 + image: quay.io/sustainable_computing_io/kepler:latest imagePullPolicy: IfNotPresent livenessProbe: failureThreshold: 5 diff --git a/model_training/s3-pusher/Dockerfile b/model_training/s3/Dockerfile similarity index 53% rename from model_training/s3-pusher/Dockerfile rename to model_training/s3/Dockerfile index f1930404..b2123d28 100644 --- a/model_training/s3-pusher/Dockerfile +++ b/model_training/s3/Dockerfile @@ -2,8 +2,8 @@ FROM python:3.8-slim WORKDIR /usr/local -COPY s3-pusher.py /usr/local +COPY . /usr/local RUN pip install boto3 ibm-cos-sdk -ENTRYPOINT ["python3.8", "s3-pusher.py"] \ No newline at end of file +ENTRYPOINT ["python3.8"] \ No newline at end of file diff --git a/model_training/s3-pusher/README.md b/model_training/s3/README.md similarity index 100% rename from model_training/s3-pusher/README.md rename to model_training/s3/README.md diff --git a/model_training/s3/s3-loader.py b/model_training/s3/s3-loader.py new file mode 100644 index 00000000..6549f886 --- /dev/null +++ b/model_training/s3/s3-loader.py @@ -0,0 +1,77 @@ +## get client +# client = new__client(args) +## upload all files in mnt path +# _upload(client, mnt_path) + +model_dir="models" +data_dir="data" +machine_spec_dir="machine_spec" + +import os + +def aws_list_keys(client, bucket_name, prefix): + response = client.list_objects_v2(Bucket=bucket_name, Prefix=prefix) + return [obj['Key'] for obj in response.get('Contents', [])] + +def ibmcloud_list_keys(client, bucket_name, prefix): + bucket_obj = client.Bucket(bucket_name) + data_response = bucket_obj.objects.filter(Prefix=prefix) + return [obj.key for obj in data_response] + +def get_bucket_file_map(client, bucket_name, machine_id, mnt_path, pipeline_name, list_func): + bucket_file_map = dict() + if machine_id is not None and machine_id != "": + top_key_path = "/" + machine_id + # add data key map + data_path = os.path.join(mnt_path, data_dir) + datapath_prefix = top_key_path + "/data" + keys = list_func(client, bucket_name, datapath_prefix) + for key in keys: + filepath = key.replace(datapath_prefix, data_path) + bucket_file_map[key] = filepath + # add model key map + model_path = os.path.join(mnt_path, model_dir, pipeline_name) + model_predix = "/models/" + pipeline_name + keys = list_func(client, bucket_name, model_predix) + for key in keys: + filepath = key.replace(model_predix, model_path) + bucket_file_map[key] = filepath + return bucket_file_map + +def aws_download(client, bucket_name, machine_id, mnt_path, pipeline_name): + print("AWS Download") + bucket_file_map = get_bucket_file_map(client, bucket_name, machine_id=machine_id, mnt_path=mnt_path, pipeline_name=pipeline_name, list_func=aws_list_keys) + for key, filepath in bucket_file_map.items(): + print(key, filepath) + dir = os.path.dirname(filepath) + if not os.path.exists(dir): + os.makedirs(dir) + client.download_file(bucket_name, key, filepath) + +def ibm_download(client, bucket_name, machine_id, mnt_path, pipeline_name): + print("IBM Download") + bucket_file_map = get_bucket_file_map(client, bucket_name, machine_id=machine_id, mnt_path=mnt_path, pipeline_name=pipeline_name, list_func=ibmcloud_list_keys) + for key, filepath in bucket_file_map.items(): + print(key, filepath) + dir = os.path.dirname(filepath) + if not os.path.exists(dir): + os.makedirs(dir) + client.Bucket(bucket_name).download_file(key, filepath) + +def add_common_args(subparser): + subparser.add_argument("--bucket-name", help="Bucket name", required=True) + subparser.add_argument("--mnt-path", help="Mount path", required=True) + subparser.add_argument("--pipeline-name", help="Pipeline name") + subparser.add_argument("--machine-id", help="Machine ID") + +import argparse +import util + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="S3 Pusher") + args = util.get_command(parser, add_common_args, ibm_download, aws_download) + if hasattr(args, "new_client_func") and hasattr(args, "func"): + client = args.new_client_func(args) + args.func(client, args.bucket_name, args.machine_id, args.mnt_path, args.pipeline_name) + else: + parser.print_help() \ No newline at end of file diff --git a/model_training/s3-pusher/s3-pusher.py b/model_training/s3/s3-pusher.py similarity index 51% rename from model_training/s3-pusher/s3-pusher.py rename to model_training/s3/s3-pusher.py index 8acd1546..0cc8508b 100644 --- a/model_training/s3-pusher/s3-pusher.py +++ b/model_training/s3/s3-pusher.py @@ -3,24 +3,9 @@ ## upload all files in mnt path # _upload(client, mnt_path) -def new_ibm_client(args): - import ibm_boto3 - from ibm_botocore.client import Config - cos = ibm_boto3.resource('s3', - ibm_api_key_id=args.api_key, - ibm_service_instance_id=args.service_instance_id, - config=Config(signature_version='oauth'), - endpoint_url=args.service_endpoint - ) - return cos - -def new_aws_client(args): - import boto3 as aws_boto3 - s3 = aws_boto3.client('s3', aws_access_key_id=args.aws_access_key_id, aws_secret_access_key=args.aws_secret_access_key, region_name=args.region_name) - return s3 - model_dir="models" data_dir="data" +machine_spec_dir="machine_spec" import os def get_bucket_file_map(machine_id, mnt_path, query_data, idle_data): model_path = os.path.join(mnt_path, model_dir) @@ -28,11 +13,12 @@ def get_bucket_file_map(machine_id, mnt_path, query_data, idle_data): top_key_path = "" if machine_id is not None and machine_id != "": top_key_path = "/" + machine_id - for root, _, files in os.walk(model_path): - for file in files: - filepath = os.path.join(root, file) - key = filepath.replace(model_path, top_key_path + "/models") - bucket_file_map[key] = filepath + if os.path.exists(model_path): + for root, _, files in os.walk(model_path): + for file in files: + filepath = os.path.join(root, file) + key = filepath.replace(model_path, "/models") + bucket_file_map[key] = filepath data_path = os.path.join(mnt_path, data_dir) for data_filename in [query_data, idle_data]: if data_filename is not None: @@ -40,6 +26,10 @@ def get_bucket_file_map(machine_id, mnt_path, query_data, idle_data): if os.path.exists(filepath): key = filepath.replace(data_path, top_key_path + "/data") bucket_file_map[key] = filepath + filepath = os.path.join(data_path, machine_spec_dir, machine_id + ".json") + if os.path.exists(filepath): + key = filepath.replace(data_path, top_key_path + "/data") + bucket_file_map[key] = filepath return bucket_file_map def aws_upload(client, bucket_name, machine_id, mnt_path, query_data, idle_data): @@ -64,29 +54,13 @@ def add_common_args(subparser): subparser.add_argument("--machine-id", help="Machine ID") import argparse +import util if __name__ == "__main__": parser = argparse.ArgumentParser(description="S3 Pusher") - subparsers = parser.add_subparsers(title="S3 provider", dest="provider") - - ibm_parser = subparsers.add_parser("ibmcloud", help="IBM Cloud") - ibm_parser.add_argument("--api-key", type=str, help="API key", required=True) - ibm_parser.add_argument("--service-instance-id", type=str, help="Service instance ID", required=True) - ibm_parser.add_argument("--service-endpoint", type=str, help="Service endpoint", required=True) - add_common_args(ibm_parser) - ibm_parser.set_defaults(new_client_func=new_ibm_client, upload_func=ibm_upload) - - aws_parser = subparsers.add_parser("aws", help="AWS") - aws_parser.add_argument("--aws-access-key-id", type=str, help="Access key ID", required=True) - aws_parser.add_argument("--aws-secret-access-key", type=str, help="Secret key", required=True) - aws_parser.add_argument("--region-name", type=str, help="Region name", required=True) - add_common_args(aws_parser) - aws_parser.set_defaults(new_client_func=new_aws_client, upload_func=aws_upload) - - args = parser.parse_args() - - if hasattr(args, "new_client_func") and hasattr(args, "upload_func"): + args = util.get_command(parser, add_common_args, ibm_upload, aws_upload) + if hasattr(args, "new_client_func") and hasattr(args, "func"): client = args.new_client_func(args) - args.upload_func(client, args.bucket_name, args.machine_id, args.mnt_path, args.query_data, args.idle_data) + args.func(client, args.bucket_name, args.machine_id, args.mnt_path, args.query_data, args.idle_data) else: parser.print_help() \ No newline at end of file diff --git a/model_training/s3/util.py b/model_training/s3/util.py new file mode 100644 index 00000000..ed5ba42a --- /dev/null +++ b/model_training/s3/util.py @@ -0,0 +1,36 @@ +def new_ibm_client(args): + import ibm_boto3 + from ibm_botocore.client import Config + cos = ibm_boto3.resource('s3', + ibm_api_key_id=args.api_key, + ibm_service_instance_id=args.service_instance_id, + config=Config(signature_version='oauth'), + endpoint_url=args.service_endpoint + ) + return cos + +def new_aws_client(args): + import boto3 as aws_boto3 + s3 = aws_boto3.client('s3', aws_access_key_id=args.aws_access_key_id, aws_secret_access_key=args.aws_secret_access_key, region_name=args.region_name) + return s3 + +def get_command(parser, add_common_args, ibm_func, aws_func): + subparsers = parser.add_subparsers(title="S3 provider", dest="provider") + + ibm_parser = subparsers.add_parser("ibmcloud", help="IBM Cloud") + ibm_parser.add_argument("--api-key", type=str, help="API key", required=True) + ibm_parser.add_argument("--service-instance-id", type=str, help="Service instance ID", required=True) + ibm_parser.add_argument("--service-endpoint", type=str, help="Service endpoint", required=True) + add_common_args(ibm_parser) + ibm_parser.set_defaults(new_client_func=new_ibm_client, func=ibm_func) + + aws_parser = subparsers.add_parser("aws", help="AWS") + aws_parser.add_argument("--aws-access-key-id", type=str, help="Access key ID", required=True) + aws_parser.add_argument("--aws-secret-access-key", type=str, help="Secret key", required=True) + aws_parser.add_argument("--region-name", type=str, help="Region name", required=True) + add_common_args(aws_parser) + aws_parser.set_defaults(new_client_func=new_aws_client, func=aws_func) + + args = parser.parse_args() + + return args \ No newline at end of file diff --git a/model_training/tekton/README.md b/model_training/tekton/README.md index a6357b8f..4e3af9f5 100644 --- a/model_training/tekton/README.md +++ b/model_training/tekton/README.md @@ -40,7 +40,7 @@ Previous step: [Prepare cluster](./README.md#1-prepare-cluster) kind: Secret metadata: name: ibm-cos-secret - type: Opaque + type: Opaque stringData: accessKeyID: ${AWS_ACCESS_KEY_ID} accessSecret: ${AWS_SECRET_ACCESS_KEY} @@ -54,7 +54,7 @@ Previous step: [Prepare cluster](./README.md#1-prepare-cluster) kind: Secret metadata: name: aws-cos-secret - type: Opaque + type: Opaque stringData: serviceEndpoint: ${IBMCLOUD_SERVICE_ENDPOINT} apiKey: ${IBMCLOUD_API_KEY} @@ -78,7 +78,7 @@ The minimum required pipelinerun for default power model of Kepler on VM is as b ![](../../fig/tekton-kepler-default.png) ``` -kubectl apply -f pipelineruns/kepler-default.yaml +kubectl apply -f examples/single-train/default.yaml ``` > If the secret is not deployed and not specified, the s3-push step will be skipped. @@ -97,13 +97,13 @@ check [single-train](./pipelines/single-train.yaml) pipeline. Example for AbsPower model: ``` -kubectl apply -f pipelineruns/abs-train-pipelinerun.yaml +kubectl apply -f examples/single-train/abs-power.yaml ``` Example of DynPower model: ``` -kubectl apply -f pipelineruns/dyn-train-pipelinerun.yaml +kubectl apply -f examples/single-train/dyn-power.yaml ``` To customize feature metrics, set @@ -137,7 +137,7 @@ check [complete-train](./pipelines/complete-train.yaml) pipeline. > If the secret is not deployed and not specified, the s3-push step will be skipped. ``` -kubectl apply -f pipelineruns/complete-pipelinerun.yaml +kubectl apply -f examples/complete-pipelinerun.yaml ``` To customize `ThirdParty` feature group, set diff --git a/model_training/tekton/pipelineruns/complete-pipelinerun.yaml b/model_training/tekton/examples/complete-pipelinerun.yaml similarity index 55% rename from model_training/tekton/pipelineruns/complete-pipelinerun.yaml rename to model_training/tekton/examples/complete-pipelinerun.yaml index 3e93b533..f7cc14e9 100644 --- a/model_training/tekton/pipelineruns/complete-pipelinerun.yaml +++ b/model_training/tekton/examples/complete-pipelinerun.yaml @@ -13,5 +13,17 @@ spec: params: - name: PIPELINE_NAME value: CompleteTrainPipelineExample + # the below parameters are for short test run + - name: STRESS_ARGS + value: + - "cpu;none;none" + - name: STRESS_TIMEOUT + value: 5 + - name: STRESS_BREAK_INTERVAL + value: 1 + - name: IDLE_COLLECT_INTERVAL + value: 10 + - name: CPU_FREQUENCY_ENABLED + value: false pipelineRef: name: complete-train-pipeline \ No newline at end of file diff --git a/model_training/tekton/examples/single-train/abs-power.yaml b/model_training/tekton/examples/single-train/abs-power.yaml new file mode 100644 index 00000000..f728898f --- /dev/null +++ b/model_training/tekton/examples/single-train/abs-power.yaml @@ -0,0 +1,33 @@ +# example-abs-train-pipeline: +# running pipelines with all default value to train AbsPower model (intel_rapl, BPFOnly) +apiVersion: tekton.dev/v1 +kind: PipelineRun +metadata: + name: example-abs-train-pipeline +spec: + timeouts: + pipeline: "6h" + tasks: "5h50m" + workspaces: + - name: mnt + persistentVolumeClaim: + claimName: task-pvc + params: + - name: PIPELINE_NAME + value: AbsPowerTrainPipelineExample + - name: OUTPUT_TYPE + value: AbsPower +# the below parameters are for short test run + - name: STRESS_ARGS + value: + - "cpu;none;none" + - name: STRESS_TIMEOUT + value: 5 + - name: STRESS_BREAK_INTERVAL + value: 1 + - name: IDLE_COLLECT_INTERVAL + value: 10 + - name: CPU_FREQUENCY_ENABLED + value: false + pipelineRef: + name: single-train-pipeline \ No newline at end of file diff --git a/model_training/tekton/examples/single-train/aws-push.yaml b/model_training/tekton/examples/single-train/aws-push.yaml new file mode 100644 index 00000000..dbe36745 --- /dev/null +++ b/model_training/tekton/examples/single-train/aws-push.yaml @@ -0,0 +1,39 @@ +# test-pipeline-aws +# short run of pipelines to test e2e from collect to train with AWS COS +apiVersion: tekton.dev/v1 +kind: PipelineRun +metadata: + name: test-pipeline-aws +spec: + timeouts: + pipeline: "6h" + tasks: "5h50m" + workspaces: + - name: mnt + persistentVolumeClaim: + claimName: task-pvc + params: + - name: PIPELINE_NAME + value: AbsPowerTrainPipelineExample + - name: OUTPUT_TYPE + value: AbsPower + - name: MACHINE_ID + value: test + - name: COS_PROVIDER + value: aws + - name: COS_SECRET_NAME + value: aws-cos-secret +# the below parameters are for short test run + - name: STRESS_ARGS + value: + - "cpu;none;none" + - name: STRESS_TIMEOUT + value: 5 + - name: STRESS_BREAK_INTERVAL + value: 1 + - name: IDLE_COLLECT_INTERVAL + value: 10 + - name: CPU_FREQUENCY_ENABLED + value: false + pipelineRef: + name: single-train-pipeline \ No newline at end of file diff --git a/model_training/tekton/pipelineruns/kepler-default.yaml b/model_training/tekton/examples/single-train/default.yaml similarity index 83% rename from model_training/tekton/pipelineruns/kepler-default.yaml rename to model_training/tekton/examples/single-train/default.yaml index f8990e25..b862739e 100644 --- a/model_training/tekton/pipelineruns/kepler-default.yaml +++ b/model_training/tekton/examples/single-train/default.yaml @@ -1,7 +1,9 @@ +# kepler-default +# running pipelines with all default value to train AbsPower model (intel_rapl, BPFOnly) with COS apiVersion: tekton.dev/v1 kind: PipelineRun metadata: - name: example-abs-train-pipeline + name: default spec: timeouts: pipeline: "6h" diff --git a/model_training/tekton/examples/single-train/dyn-power.yaml b/model_training/tekton/examples/single-train/dyn-power.yaml new file mode 100644 index 00000000..779b407c --- /dev/null +++ b/model_training/tekton/examples/single-train/dyn-power.yaml @@ -0,0 +1,33 @@ +# example-dyn-train-pipeline: +# running pipelines with all default value to train DynPower model (intel_rapl, BPFOnly) +apiVersion: tekton.dev/v1 +kind: PipelineRun +metadata: + name: example-dyn-train-pipeline +spec: + timeouts: + pipeline: "6h" + tasks: "5h50m" + workspaces: + - name: mnt + persistentVolumeClaim: + claimName: task-pvc + params: + - name: PIPELINE_NAME + value: DynPowerTrainPipelineExample + - name: OUTPUT_TYPE + value: DynPower +# the below parameters are for short test run + - name: STRESS_ARGS + value: + - "cpu;none;none" + - name: STRESS_TIMEOUT + value: 5 + - name: STRESS_BREAK_INTERVAL + value: 1 + - name: IDLE_COLLECT_INTERVAL + value: 10 + - name: CPU_FREQUENCY_ENABLED + value: false + pipelineRef: + name: single-train-pipeline \ No newline at end of file diff --git a/model_training/tekton/examples/single-train/ibmcloud-push.yaml b/model_training/tekton/examples/single-train/ibmcloud-push.yaml new file mode 100644 index 00000000..8a334c37 --- /dev/null +++ b/model_training/tekton/examples/single-train/ibmcloud-push.yaml @@ -0,0 +1,39 @@ +# test-pipeline-ibmcloud +# short run of pipelines to test e2e from collect to train with IBMCloud COS +apiVersion: tekton.dev/v1 +kind: PipelineRun +metadata: + name: test-pipeline-ibmcloud +spec: + timeouts: + pipeline: "6h" + tasks: "5h50m" + workspaces: + - name: mnt + persistentVolumeClaim: + claimName: task-pvc + params: + - name: PIPELINE_NAME + value: AbsPowerTrainPipelineExample + - name: OUTPUT_TYPE + value: AbsPower + - name: MACHINE_ID + value: test + - name: COS_PROVIDER + value: ibmcloud + - name: COS_SECRET_NAME + value: ibm-cos-secret +# the below parameters are for short test run + - name: STRESS_ARGS + value: + - "cpu;none;none" + - name: STRESS_TIMEOUT + value: 5 + - name: STRESS_BREAK_INTERVAL + value: 1 + - name: IDLE_COLLECT_INTERVAL + value: 10 + - name: CPU_FREQUENCY_ENABLED + value: false + pipelineRef: + name: single-train-pipeline \ No newline at end of file diff --git a/model_training/tekton/examples/test-collect.yaml b/model_training/tekton/examples/test-collect.yaml new file mode 100644 index 00000000..7bf1209e --- /dev/null +++ b/model_training/tekton/examples/test-collect.yaml @@ -0,0 +1,30 @@ +# test-collect +# short run of pipelines to test collecting data +apiVersion: tekton.dev/v1 +kind: PipelineRun +metadata: + name: test-collect +spec: + timeouts: + pipeline: "6h" + tasks: "5h50m" + workspaces: + - name: mnt + persistentVolumeClaim: + claimName: task-pvc + params: + - name: MACHINE_ID + value: test + - name: STRESS_ARGS + value: + - "cpu;none;none" + - name: STRESS_TIMEOUT + value: 5 + - name: STRESS_BREAK_INTERVAL + value: 1 + - name: IDLE_COLLECT_INTERVAL + value: 10 + - name: CPU_FREQUENCY_ENABLED + value: false + pipelineRef: + name: collect-data-pipeline \ No newline at end of file diff --git a/model_training/tekton/pipelineruns/abs-train-pipelinerun.yaml b/model_training/tekton/examples/test-retrain.yaml similarity index 82% rename from model_training/tekton/pipelineruns/abs-train-pipelinerun.yaml rename to model_training/tekton/examples/test-retrain.yaml index e0c75c00..4ce505d7 100644 --- a/model_training/tekton/pipelineruns/abs-train-pipelinerun.yaml +++ b/model_training/tekton/examples/test-retrain.yaml @@ -1,7 +1,7 @@ apiVersion: tekton.dev/v1 kind: PipelineRun metadata: - name: example-abs-train-pipeline + name: test-retrain-ibmcloud spec: timeouts: pipeline: "6h" @@ -16,4 +16,4 @@ spec: - name: OUTPUT_TYPE value: AbsPower pipelineRef: - name: single-train-pipeline \ No newline at end of file + name: single-retrain-pipeline \ No newline at end of file diff --git a/model_training/tekton/pipelineruns/dyn-train-pipelinerun.yaml b/model_training/tekton/pipelineruns/dyn-train-pipelinerun.yaml deleted file mode 100644 index 1c50799b..00000000 --- a/model_training/tekton/pipelineruns/dyn-train-pipelinerun.yaml +++ /dev/null @@ -1,19 +0,0 @@ -apiVersion: tekton.dev/v1 -kind: PipelineRun -metadata: - name: example-dyn-train-pipeline -spec: - timeouts: - pipeline: "6h" - tasks: "5h50m" - workspaces: - - name: mnt - persistentVolumeClaim: - claimName: task-pvc - params: - - name: PIPELINE_NAME - value: DynPowerTrainPipelineExample - - name: OUTPUT_TYPE - value: DynPower - pipelineRef: - name: single-train-pipeline \ No newline at end of file diff --git a/model_training/tekton/pipelines/collect.yaml b/model_training/tekton/pipelines/collect.yaml new file mode 100644 index 00000000..ec9112a3 --- /dev/null +++ b/model_training/tekton/pipelines/collect.yaml @@ -0,0 +1,185 @@ +############################################## +## +## collect-data-pipeline: +## +## - presteps (collect metrics at idle state and record start time) +## - run stressng workloads (tasks/stressng-task.yaml) +## - collect metrics (record end time and collect metrics when running stressng) +## - push to s3 +## +############################################## +apiVersion: tekton.dev/v1 +kind: Pipeline +metadata: + name: collect-data-pipeline +spec: + workspaces: + - name: mnt + description: Mount path + params: + - name: MODEL_SERVER_IMAGE + description: Specify model server image + default: quay.io/sustainable_computing_io/kepler_model_server:v0.7 + - name: IDLE_COLLECT_INTERVAL + description: Specify interval time to collect profile (idle) data before start the workload + default: 100 + - name: STRESS_BREAK_INTERVAL + description: Specify break interval between each stress load + default: 5 + - name: STRESS_TIMEOUT + description: Specify stress duration (timeout to stop stress) + default: 20 + - name: STRESS_ARGS + description: List arguments for CPU frequency and stressng workload (CPU_FREQUENCY;STRESS_LOAD;STRESS_INSTANCE_NUM;STRESS_EXTRA_PARAM_KEYS;STRESS_EXTRA_PARAM_VALS) + type: array + default: + - "cpu;none;none" + - "branch;none;none" + - "regs;none;none" + - "l1cache;none;none" + - "cache;none;none" + # - "stream;none;none" + # - "vm-rw;vm-rw-bytes;4G" + # - "sctp;none;none" + - name: THIRDPARTY_METRICS + description: Specify list of third party metric to export (required only for ThirdParty feature group) + default: "" + - name: COS_PROVIDER + description: Specify COS provider (supported choices are ibmcloud, aws) + default: "" + - name: COS_SECRET_NAME + description: Specify COS secret name + default: "" + - name: MACHINE_ID + description: Specify machine id to group model result in bucket + default: "" + - name: CPU_FREQUENCY_ENABLED + description: Enable/Disable CPU Frequency Steps + default: true + tasks: + - name: presteps + params: + - name: IDLE_COLLECT_INTERVAL + value: $(params.IDLE_COLLECT_INTERVAL) + - name: THIRDPARTY_METRICS + value: $(params.THIRDPARTY_METRICS) + - name: MODEL_SERVER_IMAGE + value: $(params.MODEL_SERVER_IMAGE) + taskSpec: + workspaces: + - name: mnt + optional: true + params: + - name: IDLE_COLLECT_INTERVAL + - name: THIRDPARTY_METRICS + - name: MODEL_SERVER_IMAGE + results: + - name: stress-start-time + description: The time recorded before running the workload + steps: + - name: sleep + image: bash:5.2 + script: | + #!/usr/bin/env bash + sleep $(params.IDLE_COLLECT_INTERVAL) + - name: collect-idle + image: $(params.MODEL_SERVER_IMAGE) + args: + - cmd/main.py + - query + - --data-path=$(workspaces.mnt.path)/data + - --interval=$(params.IDLE_COLLECT_INTERVAL) + - --thirdparty-metrics="$(params.THIRDPARTY_METRICS)" + - --benchmark=idle + - -o=idle + command: ["python3.8"] + env: + - name: PROM_SERVER + value: http://prometheus-k8s.monitoring.svc:9090 + - name: record-start-time + image: bash:5.2 + script: | + #!/usr/bin/env bash + echo -n $(date +%Y-%m-%dT%H:%M:%SZ) > $(results.stress-start-time.path) + - name: run-stressng + runAfter: [presteps] + taskRef: + name: run-stressng + timeout: "5h" + params: + - name: INTERVAL + value: $(params.STRESS_BREAK_INTERVAL) + - name: TIMEOUT + value: $(params.STRESS_TIMEOUT) + - name: arguments + value: $(params.STRESS_ARGS[*]) + - name: CPU_FREQUENCY_ENABLED + value: $(params.CPU_FREQUENCY_ENABLED) + - name: collect-metric + runAfter: [run-stressng] + params: + - name: THIRDPARTY_METRICS + value: $(params.THIRDPARTY_METRICS) + - name: MODEL_SERVER_IMAGE + value: $(params.MODEL_SERVER_IMAGE) + taskSpec: + workspaces: + - name: mnt + optional: true + params: + - name: BENCHMARK + default: stressng + - name: THIRDPARTY_METRICS + - name: MODEL_SERVER_IMAGE + steps: + - name: collect-stressng + image: $(params.MODEL_SERVER_IMAGE) + args: + - cmd/main.py + - query + - --data-path=$(workspaces.mnt.path)/data + - --start-time=$(tasks.presteps.results.stress-start-time) + - --end-time=$(tasks.run-stressng.results.stress-end-time) + - --thirdparty-metrics="$(params.THIRDPARTY_METRICS)" + - --benchmark=stressng + - -o=kepler_query + command: ["python3.8"] + env: + - name: PROM_SERVER + value: http://prometheus-k8s.monitoring.svc:9090 + - name: ibmcloud-s3-push + runAfter: [collect-metric] + when: + - input: "$(params.COS_PROVIDER)" + operator: in + values: ["ibmcloud"] + - input: "$(params.COS_SECRET_NAME)" + operator: notin + values: [""] + workspaces: + - name: mnt + params: + - name: COS_SECRET_NAME + value: $(params.COS_SECRET_NAME) + - name: MACHINE_ID + value: $(params.MACHINE_ID) + taskRef: + name: ibmcloud-s3-push + - name: aws-s3-push + runAfter: [collect-metric] + when: + - input: "$(params.COS_PROVIDER)" + operator: in + values: ["aws"] + - input: "$(params.COS_SECRET_NAME)" + operator: notin + values: [""] + workspaces: + - name: mnt + params: + - name: COS_SECRET_NAME + value: $(params.COS_SECRET_NAME) + - name: MACHINE_ID + value: $(params.MACHINE_ID) + taskRef: + name: aws-s3-push \ No newline at end of file diff --git a/model_training/tekton/pipelines/complete-retrain.yaml b/model_training/tekton/pipelines/complete-retrain.yaml new file mode 100644 index 00000000..56e0f653 --- /dev/null +++ b/model_training/tekton/pipelines/complete-retrain.yaml @@ -0,0 +1,159 @@ +############################################## +## +## complete-retrain-pipeline: +## +## - run original model server pipeline which produces AbsPower and DynPower +## for all available feature groups +## +############################################## +apiVersion: tekton.dev/v1 +kind: Pipeline +metadata: + name: complete-retrain-pipeline +spec: + workspaces: + - name: mnt + description: Mount path + params: + - name: MODEL_SERVER_IMAGE + description: Specify model server image + default: quay.io/sustainable_computing_io/kepler_model_server:v0.7 + - name: PIPELINE_NAME + description: Specify pipeline name (output prefix/folder) + - name: ENERGY_SOURCE + description: Specify target energy sources (check https://sustainable-computing.io/kepler_model_server/pipeline/#energy-source) + default: acpi,intel_rapl + - name: EXTRACTOR + description: Specify extractor class (default or smooth) + default: default + - name: ISOLATOR + description: Specify isolator class (none, min, profile, or trainer (if ABS_PIPELINE_NAME is set) + default: min + - name: ABS_TRAINERS + description: Specify trainer names for AbsPower training (use comma(,) as delimiter) + default: default + - name: DYN_TRAINERS + description: Specify trainer names for DynPower training (use comma(,) as delimiter) + default: default + - name: THIRDPARTY_METRICS + description: Specify list of third party metric to export (required only for ThirdParty feature group) + default: "" + - name: ABS_PIPELINE_NAME + description: Specify pipeline name to be used for initializing trainer isolator + default: "" + - name: COS_PROVIDER + description: Specify COS provider (supported choices are ibmcloud, aws) + default: "" + - name: COS_SECRET_NAME + description: Specify COS secret name + default: "" + - name: MACHINE_ID + description: Specify machine id to group model result in bucket + default: "" + - name: LOAD_DATA + description: Specify whether to load data + default: true + tasks: + - name: ibmcloud-s3-load + when: + - input: "$(params.LOAD_DATA)" + operator: in + values: ["true"] + - input: "$(params.COS_PROVIDER)" + operator: in + values: ["ibmcloud"] + - input: "$(params.COS_SECRET_NAME)" + operator: notin + values: [""] + workspaces: + - name: mnt + params: + - name: COS_SECRET_NAME + value: $(params.COS_SECRET_NAME) + - name: MACHINE_ID + value: $(params.MACHINE_ID) + - name: PIPELINE_NAME + value: $(params.PIPELINE_NAME) + taskRef: + name: ibmcloud-s3-load + - name: aws-s3-load + when: + - input: "$(params.LOAD_DATA)" + operator: in + values: ["true"] + - input: "$(params.COS_PROVIDER)" + operator: in + values: ["aws"] + - input: "$(params.COS_SECRET_NAME)" + operator: notin + values: [""] + workspaces: + - name: mnt + params: + - name: COS_SECRET_NAME + value: $(params.COS_SECRET_NAME) + - name: MACHINE_ID + value: $(params.MACHINE_ID) + - name: PIPELINE_NAME + value: $(params.PIPELINE_NAME) + taskRef: + name: aws-s3-load + - name: train-from-query + runAfter: [ibmcloud-s3-load, aws-s3-load] + workspaces: + - name: mnt + taskRef: + name: original-pipeline-task + params: + - name: MODEL_SERVER_IMAGE + value: $(params.MODEL_SERVER_IMAGE) + - name: PIPELINE_NAME + value: $(params.PIPELINE_NAME) + - name: EXTRACTOR + value: $(params.EXTRACTOR) + - name: ISOLATOR + value: $(params.ISOLATOR) + - name: ABS_TRAINERS + value: $(params.ABS_TRAINERS) + - name: DYN_TRAINERS + value: $(params.DYN_TRAINERS) + - name: ENERGY_SOURCE + value: $(params.ENERGY_SOURCE) + - name: THIRDPARTY_METRICS + value: $(params.THIRDPARTY_METRICS) + - name: ibmcloud-s3-push + runAfter: [train-from-query] + when: + - input: "$(params.COS_PROVIDER)" + operator: in + values: ["ibmcloud"] + - input: "$(params.COS_SECRET_NAME)" + operator: notin + values: [""] + workspaces: + - name: mnt + params: + - name: COS_SECRET_NAME + value: $(params.COS_SECRET_NAME) + - name: MACHINE_ID + value: $(params.MACHINE_ID) + taskRef: + name: ibmcloud-s3-push + - name: aws-s3-push + runAfter: [train-from-query] + when: + - input: "$(params.COS_PROVIDER)" + operator: in + values: ["aws"] + - input: "$(params.COS_SECRET_NAME)" + operator: notin + values: [""] + workspaces: + - name: mnt + params: + - name: COS_SECRET_NAME + value: $(params.COS_SECRET_NAME) + - name: MACHINE_ID + value: $(params.MACHINE_ID) + taskRef: + name: aws-s3-push \ No newline at end of file diff --git a/model_training/tekton/pipelines/complete-train.yaml b/model_training/tekton/pipelines/complete-train.yaml index 2c729b45..a970db34 100644 --- a/model_training/tekton/pipelines/complete-train.yaml +++ b/model_training/tekton/pipelines/complete-train.yaml @@ -74,6 +74,9 @@ spec: - name: MACHINE_ID description: Specify machine id to group model result in bucket default: "" + - name: CPU_FREQUENCY_ENABLED + description: Enable/Disable CPU Frequency Steps + default: true tasks: - name: presteps params: @@ -126,6 +129,8 @@ spec: value: $(params.STRESS_TIMEOUT) - name: arguments value: $(params.STRESS_ARGS[*]) + - name: CPU_FREQUENCY_ENABLED + value: $(params.CPU_FREQUENCY_ENABLED) - name: collect-metric runAfter: [run-stressng] params: diff --git a/model_training/tekton/pipelines/retrain.yaml b/model_training/tekton/pipelines/single-retrain.yaml similarity index 80% rename from model_training/tekton/pipelines/retrain.yaml rename to model_training/tekton/pipelines/single-retrain.yaml index bfafb291..8baac46c 100644 --- a/model_training/tekton/pipelines/retrain.yaml +++ b/model_training/tekton/pipelines/single-retrain.yaml @@ -1,6 +1,6 @@ ############################################## ## -## retrain-pipeline: +## single-retrain-pipeline ## ## - For AbsPower, extract kepler metrics in node level into dataframe (tasks/extract.yaml) ## - For DynPower, extract kepler metrics into dataframe in container level and remove background power (tasks/isolate.yaml) @@ -10,7 +10,7 @@ apiVersion: tekton.dev/v1 kind: Pipeline metadata: - name: retrain-pipeline + name: single-retrain-pipeline spec: workspaces: - name: mnt @@ -53,8 +53,56 @@ spec: - name: MACHINE_ID description: Specify machine id to group model result in bucket default: "" + - name: LOAD_DATA + description: Specify whether to load data + default: true tasks: + - name: ibmcloud-s3-load + when: + - input: "$(params.LOAD_DATA)" + operator: in + values: ["true"] + - input: "$(params.COS_PROVIDER)" + operator: in + values: ["ibmcloud"] + - input: "$(params.COS_SECRET_NAME)" + operator: notin + values: [""] + workspaces: + - name: mnt + params: + - name: COS_SECRET_NAME + value: $(params.COS_SECRET_NAME) + - name: MACHINE_ID + value: $(params.MACHINE_ID) + - name: PIPELINE_NAME + value: $(params.PIPELINE_NAME) + taskRef: + name: ibmcloud-s3-load + - name: aws-s3-load + when: + - input: "$(params.LOAD_DATA)" + operator: in + values: ["true"] + - input: "$(params.COS_PROVIDER)" + operator: in + values: ["aws"] + - input: "$(params.COS_SECRET_NAME)" + operator: notin + values: [""] + workspaces: + - name: mnt + params: + - name: COS_SECRET_NAME + value: $(params.COS_SECRET_NAME) + - name: MACHINE_ID + value: $(params.MACHINE_ID) + - name: PIPELINE_NAME + value: $(params.PIPELINE_NAME) + taskRef: + name: aws-s3-load - name: extract + runAfter: [ibmcloud-s3-load, aws-s3-load] when: - input: "$(params.OUTPUT_TYPE)" operator: in @@ -158,8 +206,8 @@ spec: value: $(params.TRAINERS) - name: THIRDPARTY_METRICS value: $(params.THIRDPARTY_METRICS) - finally: - name: ibmcloud-s3-push + runAfter: [train-absolute-power-model, train-dynamic-power-model] when: - input: "$(params.COS_PROVIDER)" operator: in @@ -177,6 +225,7 @@ spec: taskRef: name: ibmcloud-s3-push - name: aws-s3-push + runAfter: [train-absolute-power-model, train-dynamic-power-model] when: - input: "$(params.COS_PROVIDER)" operator: in diff --git a/model_training/tekton/pipelines/single-train.yaml b/model_training/tekton/pipelines/single-train.yaml index 2c38a0ba..91e29217 100644 --- a/model_training/tekton/pipelines/single-train.yaml +++ b/model_training/tekton/pipelines/single-train.yaml @@ -77,6 +77,9 @@ spec: - name: MACHINE_ID description: Specify machine id to group model result in bucket default: "" + - name: CPU_FREQUENCY_ENABLED + description: Enable/Disable CPU Frequency Steps + default: true tasks: - name: presteps params: @@ -134,6 +137,8 @@ spec: value: $(params.STRESS_TIMEOUT) - name: arguments value: $(params.STRESS_ARGS[*]) + - name: CPU_FREQUENCY_ENABLED + value: $(params.CPU_FREQUENCY_ENABLED) - name: collect-metric runAfter: [run-stressng] params: diff --git a/model_training/tekton/push-taskrun.yaml b/model_training/tekton/push-taskrun.yaml deleted file mode 100644 index 26d53957..00000000 --- a/model_training/tekton/push-taskrun.yaml +++ /dev/null @@ -1,16 +0,0 @@ -apiVersion: tekton.dev/v1 -kind: TaskRun -metadata: - name: push-s3 -spec: - workspaces: - - name: mnt - persistentVolumeClaim: - claimName: task-pvc - params: - - name: COS_PROVIDER - value: ibmcloud - - name: COS_SECRET_NAME - value: ibm-cos-secret - taskRef: - name: ibmcloud-s3-push \ No newline at end of file diff --git a/model_training/tekton/tasks/s3/aws-s3-load.yaml b/model_training/tekton/tasks/s3/aws-s3-load.yaml new file mode 100644 index 00000000..269e84f4 --- /dev/null +++ b/model_training/tekton/tasks/s3/aws-s3-load.yaml @@ -0,0 +1,58 @@ +###################################### +## +## s3-push task for AWS +## +###################################### +apiVersion: tekton.dev/v1 +kind: Task +metadata: + name: aws-s3-load +spec: + params: + - name: COS_SECRET_NAME + description: Specify cos secret name + default: "" + - name: MACHINE_ID + description: Specify machine id to group model result in bucket + default: "" + - name: PIPELINE_NAME + description: Specify pipeline name (output prefix/folder) + default: default + workspaces: + - name: mnt + optional: true + steps: + - name: load + image: quay.io/sustainability/kepler_model_server/s3:v0.7 + env: + - name: ACCESS_KEY_ID + valueFrom: + secretKeyRef: + name: $(params.COS_SECRET_NAME) + key: accessKeyID + - name: ACCESS_SECRET + valueFrom: + secretKeyRef: + name: $(params.COS_SECRET_NAME) + key: accessSecret + - name: REGION_NAME + valueFrom: + secretKeyRef: + name: $(params.COS_SECRET_NAME) + key: regionName + - name: BUCKET_NAME + valueFrom: + secretKeyRef: + name: $(params.COS_SECRET_NAME) + key: bucketName + command: ["python3.8"] + args: + - s3-loader.py + - aws + - --aws-access-key-id=$(ACCESS_KEY_ID) + - --aws-secret-access-key=$(ACCESS_SECRET) + - --region-name=$(REGION_NAME) + - --bucket-name=$(BUCKET_NAME) + - --mnt-path=$(workspaces.mnt.path) + - --pipeline-name=$(params.PIPELINE_NAME) + - --machine-id=$(params.MACHINE_ID) \ No newline at end of file diff --git a/model_training/tekton/tasks/s3-pusher/aws.yaml b/model_training/tekton/tasks/s3/aws-s3-push.yaml similarity index 95% rename from model_training/tekton/tasks/s3-pusher/aws.yaml rename to model_training/tekton/tasks/s3/aws-s3-push.yaml index d43ff906..6e3547b0 100644 --- a/model_training/tekton/tasks/s3-pusher/aws.yaml +++ b/model_training/tekton/tasks/s3/aws-s3-push.yaml @@ -20,7 +20,7 @@ spec: optional: true steps: - name: push - image: quay.io/sustainability/kepler_model_server/s3_pusher:v0.7 + image: quay.io/sustainability/kepler_model_server/s3:v0.7 env: - name: ACCESS_KEY_ID valueFrom: diff --git a/model_training/tekton/tasks/s3/ibmcloud-s3-load.yaml b/model_training/tekton/tasks/s3/ibmcloud-s3-load.yaml new file mode 100644 index 00000000..ef642704 --- /dev/null +++ b/model_training/tekton/tasks/s3/ibmcloud-s3-load.yaml @@ -0,0 +1,58 @@ +###################################### +## +## s3-push task for IBM Cloud +## +###################################### +apiVersion: tekton.dev/v1 +kind: Task +metadata: + name: ibmcloud-s3-load +spec: + params: + - name: COS_SECRET_NAME + description: Specify cos secret name + default: "" + - name: MACHINE_ID + description: Specify machine id to group model result in bucket + default: "" + - name: PIPELINE_NAME + description: Specify pipeline name (output prefix/folder) + default: default + workspaces: + - name: mnt + optional: true + steps: + - name: load + image: quay.io/sustainability/kepler_model_server/s3:v0.7 + env: + - name: SERVICE_ENDPOINT + valueFrom: + secretKeyRef: + name: $(params.COS_SECRET_NAME) + key: serviceEndpoint + - name: API_KEY + valueFrom: + secretKeyRef: + name: $(params.COS_SECRET_NAME) + key: apiKey + - name: SERVICE_INSTANCE_ID + valueFrom: + secretKeyRef: + name: $(params.COS_SECRET_NAME) + key: serviceInstanceID + - name: BUCKET_NAME + valueFrom: + secretKeyRef: + name: $(params.COS_SECRET_NAME) + key: bucketName + command: ["python3.8"] + args: + - s3-loader.py + - ibmcloud + - --service-endpoint=$(SERVICE_ENDPOINT) + - --api-key=$(API_KEY) + - --service-instance-id=$(SERVICE_INSTANCE_ID) + - --bucket-name=$(BUCKET_NAME) + - --mnt-path=$(workspaces.mnt.path) + - --pipeline-name=$(params.PIPELINE_NAME) + - --machine-id=$(params.MACHINE_ID) \ No newline at end of file diff --git a/model_training/tekton/tasks/s3-pusher/ibmcloud.yaml b/model_training/tekton/tasks/s3/ibmcloud-s3-push.yaml similarity index 95% rename from model_training/tekton/tasks/s3-pusher/ibmcloud.yaml rename to model_training/tekton/tasks/s3/ibmcloud-s3-push.yaml index ff1bb34a..93ed26bf 100644 --- a/model_training/tekton/tasks/s3-pusher/ibmcloud.yaml +++ b/model_training/tekton/tasks/s3/ibmcloud-s3-push.yaml @@ -20,7 +20,7 @@ spec: optional: true steps: - name: push - image: quay.io/sustainability/kepler_model_server/s3_pusher:v0.7 + image: quay.io/sustainability/kepler_model_server/s3:v0.7 env: - name: SERVICE_ENDPOINT valueFrom: diff --git a/model_training/tekton/tasks/stressng-task.yaml b/model_training/tekton/tasks/stressng-task.yaml index 0031159c..ef6450f4 100644 --- a/model_training/tekton/tasks/stressng-task.yaml +++ b/model_training/tekton/tasks/stressng-task.yaml @@ -20,6 +20,9 @@ spec: - name: INTERVAL description: Interval between each run default: 5 + - name: CPU_FREQUENCY_ENABLED + description: Enable/Disable CPU Frequency Steps + default: true results: - name: stress-end-time description: The time recorded after running the workload @@ -150,7 +153,7 @@ spec: TOTAL_CPUS=$(lscpu|grep "CPU(s):"|head -n 1|awk '{print $2}') OVERCOMMIT=$((TOTAL_CPUS + 8)) - if [ -f "/sys/devices/system/cpu/cpufreq/policy0/scaling_max_freq" ]; then + if [ -f "/sys/devices/system/cpu/cpufreq/policy0/scaling_max_freq" ] && [ "$CPU_FREQUENCY_ENABLED" = true ]; then FREQ_COUNT=$(((MAX_FREQ-START_FREQ)/CPU_FREQUENCY_STEP+1)) else FREQ_COUNT=1 @@ -167,7 +170,7 @@ spec: echo "Estimation Time (s): $ESTIMATE_SEC" for argument in "$@"; do - if [ -f "/sys/devices/system/cpu/cpufreq/policy0/scaling_max_freq" ]; then + if [ -f "/sys/devices/system/cpu/cpufreq/policy0/scaling_max_freq" ] && [ "$CPU_FREQUENCY_ENABLED" = true ]; then echo "Frequency range: $START_FREQ $CPU_FREQUENCY_STEP $MAX_FREQ" for freq in $(seq $START_FREQ $CPU_FREQUENCY_STEP $MAX_FREQ); do run_freq $freq $argument $TOTAL_CPUS $OVERCOMMIT diff --git a/src/estimate/estimator.py b/src/estimate/estimator.py index 09b30d1c..fc464df6 100644 --- a/src/estimate/estimator.py +++ b/src/estimate/estimator.py @@ -54,12 +54,14 @@ def handle_request(data): return {"powers": dict(), "msg": msg} output_type = ModelOutputType[power_request.output_type] - energy_source = power_request.energy_source + # TODO: need revisit if get more than one rapl energy source + if power_request.energy_source is None or 'rapl' in power_request.energy_source: + power_request.energy_source = "intel_rapl" if output_type.name not in loaded_model: loaded_model[output_type.name] = dict() output_path = "" - if energy_source not in loaded_model[output_type.name]: + if power_request.energy_source not in loaded_model[output_type.name]: output_path = get_download_output_path(download_path, power_request.energy_source, output_type) if not os.path.exists(output_path): # try connecting to model server @@ -75,11 +77,11 @@ def handle_request(data): print("load model from config: ", output_path) else: print("load model from model server: ", output_path) - loaded_model[output_type.name][energy_source] = load_downloaded_model(power_request.energy_source, output_type) + loaded_model[output_type.name][power_request.energy_source] = load_downloaded_model(power_request.energy_source, output_type) # remove loaded model shutil.rmtree(output_path) - model = loaded_model[output_type.name][energy_source] + model = loaded_model[output_type.name][power_request.energy_source] powers, msg = model.get_power(power_request.datapoint) if msg != "": print("{} fail to predict, removed: {}".format(model.model_name, msg)) diff --git a/src/server/model_server.py b/src/server/model_server.py index df378e98..05282d58 100644 --- a/src/server/model_server.py +++ b/src/server/model_server.py @@ -14,7 +14,7 @@ from util.train_types import get_valid_feature_groups, ModelOutputType, FeatureGroups, FeatureGroup from util.config import getConfig, model_toppath, ERROR_KEY, MODEL_SERVER_MODEL_REQ_PATH, MODEL_SERVER_MODEL_LIST_PATH, initial_pipeline_url, download_path -from util.loader import parse_filters, is_valid_model, load_json, load_weight, get_model_group_path, get_archived_file, METADATA_FILENAME, CHECKPOINT_FOLDERNAME, get_pipeline_path, any_node_type, is_matched_type +from util.loader import parse_filters, is_valid_model, load_json, load_weight, get_model_group_path, get_archived_file, METADATA_FILENAME, CHECKPOINT_FOLDERNAME, get_pipeline_path, any_node_type, is_matched_type, lr_trainers ############################################### # model request @@ -47,6 +47,8 @@ def select_best_model(valid_groupath, filters, trainer_name="", node_type=any_no f != CHECKPOINT_FOLDERNAME \ and not os.path.isfile(os.path.join(valid_groupath,f)) \ and (trainer_name == "" or trainer_name in f)] + if weight: + model_names = [name for name in model_names if name.split("_")[0] in lr_trainers] # Load metadata of trainers best_cadidate = None best_response = None @@ -81,6 +83,11 @@ def get_model(): model_request = request.get_json() print("get request /model: {}".format(model_request)) req = ModelRequest(**model_request) + energy_source = req.source + # TODO: need revisit if get more than one rapl energy source + if energy_source is None or 'rapl' in energy_source: + energy_source = 'intel_rapl' + # find valid feature groups from available metrics in request valid_fgs = get_valid_feature_groups(req.metrics) # parse filtering conditions of metadata attribute from request if exists (e.g., minimum mae) @@ -90,7 +97,7 @@ def get_model(): best_response = None # find best model comparing best candidate from each valid feature group complied with filtering conditions for fg in valid_fgs: - valid_groupath = get_model_group_path(model_toppath, output_type, fg, req.source) + valid_groupath = get_model_group_path(model_toppath, output_type, fg, energy_source) if os.path.exists(valid_groupath): best_candidate, response = select_best_model(valid_groupath, filters, req.trainer_name, req.node_type, req.weight) if best_candidate is None: @@ -134,8 +141,8 @@ def get_available_models(): output_types = [ot for ot in ModelOutputType] else: output_types = [ModelOutputType[ot]] - - if energy_source is None: + # TODO: need revisit if get more than one rapl energy source + if energy_source is None or 'rapl' in energy_source: energy_source = 'intel_rapl' if filter is None: diff --git a/src/util/__init__.py b/src/util/__init__.py index 20f07372..c9339622 100644 --- a/src/util/__init__.py +++ b/src/util/__init__.py @@ -8,5 +8,5 @@ from saver import assure_path, save_csv, save_json, save_pkl, save_metadata, save_scaler, save_weight from config import getConfig, model_toppath from prom_types import get_valid_feature_group_from_queries -from train_types import SYSTEM_FEATURES, COUNTER_FEAUTRES, CGROUP_FEATURES, BPF_FEATURES, IRQ_FEATURES, KUBELET_FEATURES, WORKLOAD_FEATURES +from train_types import SYSTEM_FEATURES, COUNTER_FEAUTRES, CGROUP_FEATURES, BPF_FEATURES, IRQ_FEATURES, WORKLOAD_FEATURES from train_types import PowerSourceMap, FeatureGroup, FeatureGroups, ModelOutputType, get_feature_group \ No newline at end of file diff --git a/src/util/config.py b/src/util/config.py index 2bb0248f..c13a1e13 100644 --- a/src/util/config.py +++ b/src/util/config.py @@ -13,7 +13,7 @@ ################################################# import os -from loader import get_url, get_pipeline_url +from loader import get_url, get_pipeline_url, default_init_model_url from train_types import ModelOutputType, is_support_output_type # must be writable (for shared volume mount) @@ -63,7 +63,8 @@ def getPath(subpath): CONFIG_PATH = getConfig('CONFIG_PATH', CONFIG_PATH) -initial_pipeline_url = getConfig('INITIAL_PIPELINE_URL', get_pipeline_url()) +model_topurl = getConfig('MODEL_TOPURL', default_init_model_url) +initial_pipeline_url = getConfig('INITIAL_PIPELINE_URL', get_pipeline_url(model_topurl=model_topurl)) model_toppath = getConfig('MODEL_PATH', getPath(MODEL_FOLDERNAME)) download_path = getConfig('MODEL_PATH', getPath(DOWNLOAD_FOLDERNAME)) @@ -118,14 +119,14 @@ def get_energy_source(prefix): return DEFAULT_COMPONENTS_SOURCE # get_init_model_url: get initial model from URL if estimator is enabled -def get_init_model_url(energy_source, output_type): +def get_init_model_url(energy_source, output_type, model_topurl=model_topurl): for prefix in modelConfigPrefix: if get_energy_source(prefix) == energy_source: modelURL = get_init_url(prefix) print("get init url", modelURL) if modelURL == "" and is_support_output_type(output_type): print("init URL is not set, try using default URL".format(output_type)) - return get_url(output_type=ModelOutputType[output_type], energy_source=energy_source) + return get_url(output_type=ModelOutputType[output_type], energy_source=energy_source, model_topurl=model_topurl) else: return modelURL print("no match config for {}, {}".format(output_type, energy_source)) diff --git a/src/util/loader.py b/src/util/loader.py index 9aeca4fc..98577e2e 100644 --- a/src/util/loader.py +++ b/src/util/loader.py @@ -13,15 +13,13 @@ VALUE_DELIMIT = ':' ARRAY_DELIMIT = ',' -DEFAULT_PIPELINE = 'default' CHECKPOINT_FOLDERNAME = 'checkpoint' PREPROCESS_FOLDERNAME = "preprocessed_data" -# TODO: change to v0.7 when the model is updated to database +# TODO: change to v0.7 when the model is updated to database, need document update # default_init_model_url = "https://raw.githubusercontent.com/sustainable-computing-io/kepler-model-db/main/models/v0.7/nx12" -# default_init_pipeline_name = "std_v0.7" +DEFAULT_PIPELINE = "std_v0.7" default_init_model_url = "https://raw.githubusercontent.com/sustainable-computing-io/kepler-model-db/main/models/v0.6/nx12" -default_init_pipeline_name = "std_v0.6" default_trainer_name = "GradientBoostingRegressorTrainer" default_node_type = "1" any_node_type = -1 @@ -243,7 +241,7 @@ def get_download_output_path(download_path, energy_source, output_type): energy_source_path = assure_path(os.path.join(download_path, energy_source)) return os.path.join(energy_source_path, output_type.name) -def get_url(output_type, feature_group=default_feature_group, trainer_name=default_trainer_name, node_type=default_node_type, model_topurl=default_init_model_url, energy_source="intel_rapl", pipeline_name=default_init_pipeline_name, model_name=None, weight=False): +def get_url(output_type, feature_group=default_feature_group, trainer_name=default_trainer_name, node_type=default_node_type, model_topurl=default_init_model_url, energy_source="intel_rapl", pipeline_name=DEFAULT_PIPELINE, model_name=None, weight=False): group_path = get_model_group_path(model_topurl, output_type=output_type, feature_group=feature_group, energy_source=energy_source, pipeline_name=pipeline_name, assure=False) if model_name is None: model_name = get_model_name(trainer_name, node_type) @@ -252,7 +250,7 @@ def get_url(output_type, feature_group=default_feature_group, trainer_name=defau file_ext = ".json" return os.path.join(group_path, model_name + file_ext) -def get_pipeline_url(model_topurl=default_init_model_url, pipeline_name=default_init_pipeline_name, weight=False): +def get_pipeline_url(model_topurl=default_init_model_url, pipeline_name=DEFAULT_PIPELINE, weight=False): file_ext = ".zip" if weight: file_ext = ".json" diff --git a/src/util/train_types.py b/src/util/train_types.py index cd90c867..89d73264 100644 --- a/src/util/train_types.py +++ b/src/util/train_types.py @@ -18,10 +18,9 @@ CGROUP_FEATURES = ["cgroupfs_cpu_usage_us", "cgroupfs_memory_usage_bytes", "cgroupfs_system_cpu_usage_us", "cgroupfs_user_cpu_usage_us"] BPF_FEATURES = ["bpf_cpu_time_us", "bpf_page_cache_hit"] IRQ_FEATURES = ["bpf_block_irq", "bpf_net_rx_irq", "bpf_net_tx_irq"] -KUBELET_FEATURES =['kubelet_memory_bytes', 'kubelet_cpu_usage'] ACCELERATE_FEATURES = ['accelerator_intel_qat'] -WORKLOAD_FEATURES = COUNTER_FEAUTRES + CGROUP_FEATURES + BPF_FEATURES + IRQ_FEATURES + KUBELET_FEATURES + ACCELERATE_FEATURES -BASIC_FEATURES = COUNTER_FEAUTRES + CGROUP_FEATURES + BPF_FEATURES + KUBELET_FEATURES +WORKLOAD_FEATURES = COUNTER_FEAUTRES + CGROUP_FEATURES + BPF_FEATURES + IRQ_FEATURES + ACCELERATE_FEATURES +BASIC_FEATURES = COUNTER_FEAUTRES + CGROUP_FEATURES + BPF_FEATURES PowerSourceMap = { "intel_rapl": ["package", "core", "uncore", "dram"], @@ -44,13 +43,12 @@ class FeatureGroup(enum.Enum): CounterOnly = 3 CgroupOnly = 4 BPFOnly = 5 - KubeletOnly = 6 - IRQOnly = 7 - CounterIRQCombined = 8 - Basic = 9 - BPFIRQ = 10 - AcceleratorOnly = 11 - ThirdParty = 12 + IRQOnly = 6 + CounterIRQCombined = 7 + Basic = 8 + BPFIRQ = 9 + AcceleratorOnly = 10 + ThirdParty = 11 Unknown = 99 class EnergyComponentLabelGroup(enum.Enum): @@ -62,7 +60,6 @@ class EnergyComponentLabelGroup(enum.Enum): class ModelOutputType(enum.Enum): AbsPower = 1 DynPower = 2 - XGBoostStandalonePower = 3 def is_support_output_type(output_type_name): return any(output_type_name == item.name for item in ModelOutputType) @@ -78,14 +75,13 @@ def deep_sort(elements): FeatureGroup.CounterOnly: deep_sort(COUNTER_FEAUTRES), FeatureGroup.CgroupOnly: deep_sort(CGROUP_FEATURES), FeatureGroup.BPFOnly: deep_sort(BPF_FEATURES), - FeatureGroup.KubeletOnly: deep_sort(KUBELET_FEATURES), FeatureGroup.BPFIRQ: deep_sort(BPF_FEATURES + IRQ_FEATURES), FeatureGroup.CounterIRQCombined: deep_sort(COUNTER_FEAUTRES + IRQ_FEATURES), FeatureGroup.Basic: deep_sort(BASIC_FEATURES), FeatureGroup.AcceleratorOnly: deep_sort(ACCELERATE_FEATURES), } -SingleSourceFeatures = [FeatureGroup.CounterOnly.name, FeatureGroup.CgroupOnly.name, FeatureGroup.BPFOnly.name, FeatureGroup.BPFIRQ.name, FeatureGroup.KubeletOnly.name] +SingleSourceFeatures = [FeatureGroup.CounterOnly.name, FeatureGroup.CgroupOnly.name, FeatureGroup.BPFOnly.name, FeatureGroup.BPFIRQ.name] def is_single_source_feature_group(fg): return fg.name in SingleSourceFeatures @@ -96,7 +92,6 @@ def is_single_source_feature_group(fg): FeatureGroup.CounterOnly: "cpu_instructions", FeatureGroup.CgroupOnly: "cgroupfs_cpu_usage_us", FeatureGroup.BPFOnly: "bpf_cpu_time_us", - FeatureGroup.KubeletOnly: "kubelet_cpu_usage", FeatureGroup.BPFIRQ: "bpf_cpu_time_us", FeatureGroup.CounterIRQCombined: "cpu_instructions", FeatureGroup.Basic: "cpu_instructions", @@ -108,7 +103,6 @@ def is_single_source_feature_group(fg): FeatureGroup.CounterOnly: "cache_miss", FeatureGroup.CgroupOnly: "cgroupfs_memory_usage_bytes", FeatureGroup.BPFOnly: "bpf_page_cache_hit", - FeatureGroup.KubeletOnly: "kubelet_memory_bytes", FeatureGroup.BPFIRQ: "bpf_page_cache_hit", FeatureGroup.CounterIRQCombined: "cache_miss", FeatureGroup.Basic: "cache_miss" diff --git a/tests/e2e_test.sh b/tests/e2e_test.sh index 0702ca81..f6198280 100755 --- a/tests/e2e_test.sh +++ b/tests/e2e_test.sh @@ -43,6 +43,11 @@ get_server_log() { kubectl logs -n kepler $(get_component model-server) -c server-api } +get_db_log(){ + kubectl logs -n kepler model-db -c trainer + kubectl logs -n kepler model-db +} + wait_for_kepler() { kubectl rollout status ds kepler-exporter -n kepler --timeout 5m kubectl describe ds -n kepler kepler-exporter @@ -55,6 +60,13 @@ wait_for_server() { wait_for_keyword server "Press CTRL+C to quit" "server has not started yet" } +wait_for_db() { + kubectl get po model-db -n kepler + kubectl wait -n kepler --for=jsonpath='{.status.phase}'=Running pod/model-db --timeout 5m + wait_for_keyword db "Http File Serve Serving" "model-db is not serving" + get_db_log +} + wait_for_keyword() { num_iterations=10 component=$1 @@ -64,16 +76,23 @@ wait_for_keyword() { if grep -q "$keyword" <<< $(get_${component}_log); then return fi - kubectl get po -n kepler -oyaml sleep 2 done echo "timeout ${num_iterations}s waiting for '${keyword}' from ${component} log" echo "Error: $message" + kubectl get po -n kepler -oyaml + echo "${component} log:" get_${component}_log # show all status kubectl get po -A + if [ ! -z ${SERVER} ]; then + get_server_log + fi + if [ ! -z ${ESTIMATOR} ]; then + get_estimator_log + fi exit 1 } @@ -84,6 +103,7 @@ check_estimator_set_and_init() { restart_model_server() { kubectl delete po -l app.kubernetes.io/component=model-server -n kepler wait_for_server + get_server_log } test() { @@ -94,6 +114,22 @@ test() { for opt in ${DEPLOY_OPTIONS}; do export $opt=true; done; + # train and deploy local modelDB + kubectl apply -f ${top_dir}/manifests/test/file-server.yaml + sleep 10 + wait_for_db + + # patch MODEL_TOPURL environment + if [ ! -z ${ESTIMATOR} ]; then + kubectl patch configmap -n kepler kepler-cfm --type merge -p "$(cat ${top_dir}/manifests/test/patch-estimator-sidecar.yaml)" + kubectl patch ds kepler-exporter -n kepler -p '{"spec":{"template":{"spec":{"containers":[{"name":"estimator","env":[{"name":"MODEL_TOPURL","value":"http://model-db.kepler.svc.cluster.local:8110"}]}]}}}}' + fi + if [ ! -z ${SERVER} ]; then + kubectl patch deploy kepler-model-server -n kepler -p '{"spec":{"template":{"spec":{"containers":[{"name":"server-api","env":[{"name":"MODEL_TOPURL","value":"http://model-db.kepler.svc.cluster.local:8110"}]}]}}}}' + kubectl delete po -n kepler -l app.kubernetes.io/component=model-server + fi + kubectl delete po -n kepler -l app.kubernetes.io/component=exporter + if [ ! -z ${ESTIMATOR} ]; then # with estimator if [ ! -z ${TEST} ]; then diff --git a/tests/estimator_model_request_test.py b/tests/estimator_model_request_test.py index 3bf2dd6f..2c290c88 100644 --- a/tests/estimator_model_request_test.py +++ b/tests/estimator_model_request_test.py @@ -16,6 +16,12 @@ import os import sys +file_server_port = 8110 +# set environment +os.environ['MODEL_SERVER_URL'] = 'http://localhost:8100' +model_topurl = 'http://localhost:{}'.format(file_server_port) +os.environ['MODEL_TOPURL'] = model_topurl + server_path = os.path.join(os.path.dirname(__file__), '../src') util_path = os.path.join(os.path.dirname(__file__), '../src/util') train_path = os.path.join(os.path.dirname(__file__), '../src/train') @@ -28,6 +34,7 @@ sys.path.append(prom_path) sys.path.append(estimate_path) +from http_server import http_file_server from train_types import FeatureGroups, FeatureGroup, ModelOutputType from loader import get_download_output_path from estimate.estimator import handle_request, loaded_model, PowerRequest @@ -38,7 +45,7 @@ from estimator_power_request_test import generate_request -os.environ['MODEL_SERVER_URL'] = 'http://localhost:8100' +http_file_server(file_server_port) import json @@ -72,17 +79,17 @@ if url != "": print("Download: ", url) response = requests.get(url) - if response.status_code == 200: - output_path = get_download_output_path(download_path, energy_source, output_type) - if output_type_name in loaded_model and energy_source in loaded_model[output_type.name]: - del loaded_model[output_type_name][energy_source] - if os.path.exists(output_path): - shutil.rmtree(output_path) - request_json = generate_request(None, n=10, metrics=FeatureGroups[FeatureGroup.Full], output_type=output_type_name) - data = json.dumps(request_json) - output = handle_request(data) - assert len(output['powers']) > 0, "cannot get power {}\n {}".format(output['msg'], request_json) - print("result from {}: {}".format(url, output)) + assert response.status_code == 200, "init url must be set and valid" + output_path = get_download_output_path(download_path, energy_source, output_type) + if output_type_name in loaded_model and energy_source in loaded_model[output_type.name]: + del loaded_model[output_type_name][energy_source] + if os.path.exists(output_path): + shutil.rmtree(output_path) + request_json = generate_request(None, n=10, metrics=FeatureGroups[FeatureGroup.Full], output_type=output_type_name) + data = json.dumps(request_json) + output = handle_request(data) + assert len(output['powers']) > 0, "cannot get power {}\n {}".format(output['msg'], request_json) + print("result from {}: {}".format(url, output)) output_type_name = 'AbsPower' estimator_enable_key = "NODE_COMPONENTS_ESTIMATOR" @@ -99,23 +106,23 @@ if os.path.exists(output_path): shutil.rmtree(output_path) # valid model - os.environ[init_url_key] = get_url(output_type=output_type, feature_group=FeatureGroup.KubeletOnly) + os.environ[init_url_key] = get_url(output_type=output_type, feature_group=FeatureGroup.CgroupOnly, model_topurl=model_topurl) print("Requesting from ", os.environ[init_url_key]) - request_json = generate_request(None, n=10, metrics=FeatureGroups[FeatureGroup.KubeletOnly], output_type=output_type_name) + request_json = generate_request(None, n=10, metrics=FeatureGroups[FeatureGroup.CgroupOnly], output_type=output_type_name) data = json.dumps(request_json) output = handle_request(data) assert len(output['powers']) > 0, "cannot get power {}\n {}".format(output['msg'], request_json) - print("result {}/{} from static set: {}".format(output_type_name, FeatureGroup.KubeletOnly.name, output)) + print("result {}/{} from static set: {}".format(output_type_name, FeatureGroup.CgroupOnly.name, output)) del loaded_model[output_type_name][energy_source] # invalid model - os.environ[init_url_key] = get_url(output_type=output_type, feature_group=FeatureGroup.BPFOnly) + os.environ[init_url_key] = get_url(output_type=output_type, feature_group=FeatureGroup.BPFOnly, model_topurl=model_topurl) print("Requesting from ", os.environ[init_url_key]) - request_json = generate_request(None, n=10, metrics=FeatureGroups[FeatureGroup.KubeletOnly], output_type=output_type_name) + request_json = generate_request(None, n=10, metrics=FeatureGroups[FeatureGroup.CgroupOnly], output_type=output_type_name) data = json.dumps(request_json) power_request = json.loads(data, object_hook = lambda d : PowerRequest(**d)) output_path = get_achived_model(power_request) assert output_path is None, "model should be invalid\n {}".format(output_path) - os.environ['MODEL_CONFIG'] = "{}=true\n{}={}\n".format(estimator_enable_key,init_url_key,get_url(output_type=output_type, feature_group=FeatureGroup.KubeletOnly)) + os.environ['MODEL_CONFIG'] = "{}=true\n{}={}\n".format(estimator_enable_key,init_url_key,get_url(output_type=output_type, feature_group=FeatureGroup.CgroupOnly, model_topurl=model_topurl)) set_env_from_model_config() print("Requesting from ", os.environ[init_url_key]) reset_failed_list() @@ -124,7 +131,7 @@ output_path = get_download_output_path(download_path, energy_source, output_type) if os.path.exists(output_path): shutil.rmtree(output_path) - request_json = generate_request(None, n=10, metrics=FeatureGroups[FeatureGroup.KubeletOnly], output_type=output_type_name) + request_json = generate_request(None, n=10, metrics=FeatureGroups[FeatureGroup.CgroupOnly], output_type=output_type_name) data = json.dumps(request_json) output = handle_request(data) assert len(output['powers']) > 0, "cannot get power {}\n {}".format(output['msg'], request_json) \ No newline at end of file diff --git a/tests/http_server.py b/tests/http_server.py new file mode 100644 index 00000000..822ef56e --- /dev/null +++ b/tests/http_server.py @@ -0,0 +1,46 @@ +# import external src +import http.server +import socketserver +import atexit +import threading + +import os +import sys + +################################################################# +# import internal src +src_path = os.path.join(os.path.dirname(__file__), '..', 'src') +sys.path.append(src_path) +################################################################# + +from util.config import model_toppath + +os.chdir(model_toppath) + +def cleanup_task(server): + print("Shutdown server...") + server.shutdown() + +def get_server(file_server_port): + Handler = http.server.SimpleHTTPRequestHandler + httpd = socketserver.TCPServer(("", file_server_port), Handler) + + # Register the cleanup task to be executed on program exit + atexit.register(cleanup_task, httpd) + + print("Http File Serve Serving at Port", file_server_port, " for ", model_toppath) + return httpd + +def http_file_server(file_server_port): + try: + httpd = get_server(file_server_port) + # Start the server in a separate thread + server_thread = threading.Thread(target=httpd.serve_forever) + server_thread.daemon = True + server_thread.start() + except Exception as err: + print("File server is running: {}".format(err)) + +if __name__ == "__main__": + httpd = get_server(8110) + httpd.serve_forever() \ No newline at end of file diff --git a/tests/minimal_trainer.py b/tests/minimal_trainer.py new file mode 100644 index 00000000..cd2b2e70 --- /dev/null +++ b/tests/minimal_trainer.py @@ -0,0 +1,18 @@ +import os +import sys + +################################################################# +# import internal src +src_path = os.path.join(os.path.dirname(__file__), '..', 'src') +sys.path.append(src_path) +################################################################# + +from pipeline_test import process + +from util import PowerSourceMap, FeatureGroup + +trainer_names = [ 'GradientBoostingRegressorTrainer', 'SGDRegressorTrainer', 'XgboostFitTrainer' ] +valid_feature_groups = [ FeatureGroup.BPFOnly, FeatureGroup.CgroupOnly ] + +if __name__ == '__main__': + process(target_energy_sources=PowerSourceMap.keys(), abs_trainer_names=trainer_names, dyn_trainer_names=trainer_names, valid_feature_groups=valid_feature_groups) \ No newline at end of file diff --git a/tests/pipeline_test.py b/tests/pipeline_test.py index dcdd5a01..81eaded3 100644 --- a/tests/pipeline_test.py +++ b/tests/pipeline_test.py @@ -26,15 +26,21 @@ def assert_pipeline(pipeline, query_results, feature_group, energy_source, energ else: assert_train(trainer, dyn_data, energy_components) -def process(save_pipeline_name=DEFAULT_PIPELINE, prom_save_path=prom_output_path, prom_save_name=prom_output_filename, abs_trainer_names=test_trainer_names, dyn_trainer_names=test_trainer_names, extractors=test_extractors, isolators=test_isolators, target_energy_sources=[test_energy_source]): +def process(save_pipeline_name=DEFAULT_PIPELINE, prom_save_path=prom_output_path, prom_save_name=prom_output_filename, abs_trainer_names=test_trainer_names, dyn_trainer_names=test_trainer_names, extractors=test_extractors, isolators=test_isolators, target_energy_sources=[test_energy_source], valid_feature_groups=None): query_results = get_query_results(save_path=prom_save_path, save_name=prom_save_name) - valid_feature_groups = get_valid_feature_group_from_queries(query_results.keys()) + if valid_feature_groups is None: + valid_feature_groups = get_valid_feature_group_from_queries(query_results.keys()) for extractor in extractors: for isolator in isolators: - energy_components = PowerSourceMap[test_energy_source] pipeline = NewPipeline(save_pipeline_name, abs_trainer_names, dyn_trainer_names, extractor=extractor, isolator=isolator, target_energy_sources=target_energy_sources ,valid_feature_groups=valid_feature_groups) - for feature_group in valid_feature_groups: - assert_pipeline(pipeline, query_results, feature_group, test_energy_source, energy_components) + for energy_source in target_energy_sources: + energy_components = PowerSourceMap[energy_source] + for feature_group in valid_feature_groups: + assert_pipeline(pipeline, query_results, feature_group, energy_source, energy_components) + # save metadata + pipeline.save_metadata() + # save pipeline + pipeline.archive_pipeline() if __name__ == '__main__': - process() \ No newline at end of file + process(target_energy_sources=PowerSourceMap.keys()) \ No newline at end of file diff --git a/tests/trainer_test.py b/tests/trainer_test.py index bfea1007..5e8bc412 100644 --- a/tests/trainer_test.py +++ b/tests/trainer_test.py @@ -3,6 +3,8 @@ import os import sys +import sklearn + ################################################################# # import internal src src_path = os.path.join(os.path.dirname(__file__), '..', 'src') @@ -30,9 +32,12 @@ def assert_train(trainer, data, energy_components): node_type_filtered_data = data[data[node_info_column] == node_type] X_values = node_type_filtered_data[trainer.features].values for component in energy_components: - output = trainer.predict(node_type_str, component, X_values) - assert len(output) == len(X_values), "length of predicted values != features ({}!={})".format(len(output), len(X_values)) - + try: + output = trainer.predict(node_type_str, component, X_values) + assert len(output) == len(X_values), "length of predicted values != features ({}!={})".format(len(output), len(X_values)) + except sklearn.exceptions.NotFittedError: + pass + def process(node_level, feature_group, result, trainer_names=test_trainer_names, energy_source=test_energy_source, power_columns=get_expected_power_columns(), pipeline_name=DEFAULT_PIPELINE): energy_components = PowerSourceMap[energy_source] train_items = []