From dbea49b17b1168487a38929ea2ee9c1e019a8453 Mon Sep 17 00:00:00 2001 From: Fiona Waters Date: Mon, 4 Nov 2024 00:01:14 +0000 Subject: [PATCH] Refactoring to remove use of image and add training-operator funcs Signed-off-by: Fiona Waters --- .../kubeflow/pytorch-launcher/Dockerfile | 23 --- .../kubeflow/pytorch-launcher/build_image.sh | 46 ----- .../pytorch-launcher/requirements.txt | 5 - .../kubeflow/pytorch-launcher/sample.py | 180 +++++------------- .../kubeflow/pytorch-launcher/src/__init__.py | 13 -- .../pytorch-launcher/src/launch_pytorchjob.py | 139 -------------- 6 files changed, 44 insertions(+), 362 deletions(-) delete mode 100644 components/kubeflow/pytorch-launcher/Dockerfile delete mode 100755 components/kubeflow/pytorch-launcher/build_image.sh delete mode 100644 components/kubeflow/pytorch-launcher/requirements.txt delete mode 100644 components/kubeflow/pytorch-launcher/src/__init__.py delete mode 100644 components/kubeflow/pytorch-launcher/src/launch_pytorchjob.py diff --git a/components/kubeflow/pytorch-launcher/Dockerfile b/components/kubeflow/pytorch-launcher/Dockerfile deleted file mode 100644 index 16c105acddc..00000000000 --- a/components/kubeflow/pytorch-launcher/Dockerfile +++ /dev/null @@ -1,23 +0,0 @@ -# Copyright 2021 The Kubeflow Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -FROM python:3.11 - -ADD requirements.txt requirements.txt - -RUN pip install --no-cache-dir -r requirements.txt - -ADD build /ml - -ENTRYPOINT ["python", "/ml/src/launch_pytorchjob.py"] diff --git a/components/kubeflow/pytorch-launcher/build_image.sh b/components/kubeflow/pytorch-launcher/build_image.sh deleted file mode 100755 index b036d79a972..00000000000 --- a/components/kubeflow/pytorch-launcher/build_image.sh +++ /dev/null @@ -1,46 +0,0 @@ -#!/bin/bash -LAUNCHER_IMAGE_NAME_DEFAULT=kubeflow-pytorchjob-launcher - -while getopts ":hr:t:i:" opt; do - case "${opt}" in - h) echo "-r: repo name (including gcr.io/, etc., if not in Docker Hub)" - echo "-i: image name (default is $LAUNCHER_IMAGE_NAME_DEFAULT)" - echo "-t: image tag (default is inferred from date/git)" - exit - ;; - r) REPO_NAME=${OPTARG} - ;; - t) TAG_NAME=${OPTARG} - ;; - i) LAUNCHER_IMAGE_NAME=${OPTARG} - ;; - \? ) echo "Usage: cmd [-p] project [-t] tag [-i] image" - exit - ;; - esac -done - -# Apply defaults/interpret inputs -LAUNCHER_IMAGE_NAME=${LAUNCHER_IMAGE_NAME:-$LAUNCHER_IMAGE_NAME_DEFAULT} -TAG_NAME=${TAG_NAME:-$(date +v%Y%m%d)-$(git describe --tags --always --dirty)-$(git diff | shasum -a256 | cut -c -6)} - -if [ -n "${REPO_NAME}" ]; then - # Ensure ends with / - if [[ "$REPO_NAME" != */ ]]; then - REPO_NAME+=/ - fi -fi - -FULL_NAME=${REPO_NAME}${LAUNCHER_IMAGE_NAME}:${TAG_NAME} - -mkdir -p ./build -cp -R ./src/ ./build/ -cp -R ../common/ ./build/ - -echo "Building image $FULL_NAME" -docker build -t ${FULL_NAME} . - -echo "Pushing image $FULL_NAME" -docker push ${FULL_NAME} - -rm -rf ./build diff --git a/components/kubeflow/pytorch-launcher/requirements.txt b/components/kubeflow/pytorch-launcher/requirements.txt deleted file mode 100644 index eab1665a2fe..00000000000 --- a/components/kubeflow/pytorch-launcher/requirements.txt +++ /dev/null @@ -1,5 +0,0 @@ -pyyaml<7,>=5.3 -kubernetes<31,>=8.0.0 -kubeflow-training>=1.8.0 -retrying>=1.3.3 -str2bool==1.1 diff --git a/components/kubeflow/pytorch-launcher/sample.py b/components/kubeflow/pytorch-launcher/sample.py index 6644200d863..b17f5934e37 100644 --- a/components/kubeflow/pytorch-launcher/sample.py +++ b/components/kubeflow/pytorch-launcher/sample.py @@ -1,7 +1,11 @@ import kfp from kfp import dsl -from typing import Optional import uuid +import datetime +import logging +from kubernetes import config +from kubeflow.training import TrainingClient +from kubeflow.training.utils import utils def get_current_namespace(): @@ -15,137 +19,52 @@ def get_current_namespace(): return current_namespace -@dsl.component() -def create_master_spec() -> dict: - # Define master spec - master = { - "replicas": 1, - "restartPolicy": "OnFailure", - "template": { - "metadata": { - "annotations": { - # See https://github.com/kubeflow/website/issues/2011 - "sidecar.istio.io/inject": "false" - } - }, - "spec": { - "containers": [ - { - "args": [ - "--backend", - "gloo", - ], - "image": "public.ecr.aws/pytorch-samples/pytorch_dist_mnist:latest", - "name": "pytorch", - "resources": { - "requests": { - "memory": "4Gi", - "cpu": "2000m", - }, - "limits": { - "memory": "4Gi", - "cpu": "2000m", - }, - }, - } - ], - }, - }, - } - - return master - - -@dsl.component -def create_worker_spec( - worker_num: int = 0 -) -> dict: - """ - Creates pytorch-job worker spec - """ - worker = {} - if worker_num > 0: - worker = { - "replicas": worker_num, - "restartPolicy": "OnFailure", - "template": { - "metadata": { - "annotations": { - "sidecar.istio.io/inject": "false" - } - }, - "spec": { - "containers": [ - { - "args": [ - "--backend", - "gloo", - ], - "image": "public.ecr.aws/pytorch-samples/pytorch_dist_mnist:latest", - "name": "pytorch", - "resources": { - "requests": { - "memory": "4Gi", - "cpu": "2000m", - # Uncomment for GPU - # "nvidia.com/gpu": 1, - }, - "limits": { - "memory": "4Gi", - "cpu": "2000m", - # Uncomment for GPU - # "nvidia.com/gpu": 1, - }, - }, - } - ] - }, - }, - } - - return worker - -# container component description setting inputs and implementation -@dsl.container_component +@dsl.component(packages_to_install=['kubernetes<31,>=8.0.0', 'kubeflow-training>=1.8.0', 'retrying>=1.3.3']) def pytorch_job_launcher( name: str, kind: str = "PyTorchJob", namespace: str = "kubeflow", - version: str = 'v2', - master_spec: dict = {}, - worker_spec: dict = {}, + worker_replicas: int = 1, job_timeout_minutes: int = 1440, delete_after_done: bool = True, - clean_pod_policy: str = 'Running', - active_deadline_seconds: Optional[int] = None, - backoff_limit: Optional[int] = None, - ttl_seconds_after_finished: Optional[int] = None, ): - command_args = [ - '--name', name, - '--kind', kind, - '--namespace', namespace, - '--version', version, - '--masterSpec', master_spec, - '--workerSpec', worker_spec, - '--jobTimeoutMinutes', job_timeout_minutes, - '--deleteAfterDone', delete_after_done, - '--cleanPodPolicy', clean_pod_policy,] - if active_deadline_seconds is not None and isinstance(active_deadline_seconds, int): - command_args.append(['--activeDeadlineSeconds', str(active_deadline_seconds)]) - if backoff_limit is not None and isinstance(backoff_limit, int): - command_args.append(['--backoffLimit', str(backoff_limit)]) - if ttl_seconds_after_finished is not None and isinstance(ttl_seconds_after_finished, int): - command_args.append(['--ttlSecondsAfterFinished', str(ttl_seconds_after_finished)]) + args = ["--backend","gloo"] + resources = {"cpu": "2000m", "memory": "4Gi"} # add "nvidia.com/gpu": 1, for GPU + base_image="public.ecr.aws/pytorch-samples/pytorch_dist_mnist:latest" + + container_spec = utils.get_container_spec(base_image=base_image, name="pytorch", resources=resources, args=args) + spec = utils.get_pod_template_spec(containers=[container_spec]) + job_template = utils.get_pytorchjob_template(name=name, namespace=namespace, num_workers=worker_replicas, worker_pod_template_spec=spec, master_pod_template_spec=spec) + + + logging.getLogger(__name__).setLevel(logging.INFO) + logging.info('Generating job template.') + + logging.info('Creating TrainingClient.') + + # remove one of these depending on where you are running this + config.load_incluster_config() + #config.load_kube_config() - return dsl.ContainerSpec( - # In order to build the image that will be used in the container, use the build_image.sh script - # providing the required parameters. This will build the image and push it to your image repository. - # Example: export REPO_NAME="" export LAUNCHER_IMAGE_NAME="" export TAG_NAME="" ./build_image.sh - image='', # replace this with your image url - command=['python', '/ml/src/launch_pytorchjob.py'], - args=command_args + training_client = TrainingClient() + + logging.info(f"Creating PyTorchJob in namespace: {namespace}") + training_client.create_job(job_template, namespace=namespace) + + expected_conditions = ["Succeeded", "Failed"] + logging.info( + f'Monitoring job until status is any of {expected_conditions}.' + ) + training_client.wait_for_job_conditions( + name=name, + namespace=namespace, + job_kind=kind, + expected_conditions=set(expected_conditions), + timeout=int(datetime.timedelta(minutes=job_timeout_minutes).total_seconds()) ) + if delete_after_done: + logging.info('Deleting job after completion.') + training_client.delete_job(name, namespace) @dsl.pipeline( @@ -155,27 +74,16 @@ def pytorch_job_launcher( def pytorch_job_pipeline( kind: str = "PyTorchJob", worker_replicas: int = 1, - ttl_seconds_after_finished: int = 3600, - job_timeout_minutes: int = 1440, - delete_after_done: bool = True, - clean_pod_policy: str ="Running" ): namespace = get_current_namespace() - worker_spec = create_worker_spec(worker_num=worker_replicas) - master_spec = create_master_spec() result = pytorch_job_launcher( name=f"mnist-train-{uuid.uuid4().hex[:8]}", kind=kind, namespace=namespace, - version="v2", - worker_spec=worker_spec.output, - master_spec=master_spec.output, - ttl_seconds_after_finished=ttl_seconds_after_finished, - job_timeout_minutes=job_timeout_minutes, - delete_after_done=delete_after_done, - clean_pod_policy=clean_pod_policy, + version="v1", + worker_replicas=worker_replicas ) diff --git a/components/kubeflow/pytorch-launcher/src/__init__.py b/components/kubeflow/pytorch-launcher/src/__init__.py deleted file mode 100644 index b4447dd5838..00000000000 --- a/components/kubeflow/pytorch-launcher/src/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright 2021 The Kubeflow Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/components/kubeflow/pytorch-launcher/src/launch_pytorchjob.py b/components/kubeflow/pytorch-launcher/src/launch_pytorchjob.py deleted file mode 100644 index f8269647eda..00000000000 --- a/components/kubeflow/pytorch-launcher/src/launch_pytorchjob.py +++ /dev/null @@ -1,139 +0,0 @@ -import argparse -import datetime -from str2bool import str2bool -import logging -import yaml - -from kubernetes import client as k8s_client -from kubernetes import config - -from kubeflow.training import TrainingClient -from kubeflow.training import KubeflowOrgV1RunPolicy -from kubeflow.training import KubeflowOrgV1PyTorchJob -from kubeflow.training import KubeflowOrgV1PyTorchJobSpec - - -def yamlOrJsonStr(string): - if string == "" or string is None: - return None - return yaml.safe_load(string) - - -def get_current_namespace(): - """Returns current namespace if available, else kubeflow""" - try: - namespace = "/var/run/secrets/kubernetes.io/serviceaccount/namespace" - current_namespace = open(namespace).read() - except FileNotFoundError: - current_namespace = "kubeflow" - return current_namespace - - -def get_arg_parser(): - parser = argparse.ArgumentParser(description='Kubeflow Job launcher') - parser.add_argument('--name', type=str, - default="pytorchjob", - help='Job name.') - parser.add_argument('--namespace', type=str, - default=get_current_namespace(), - help='Job namespace.') - parser.add_argument('--version', type=str, - default='v1', - help='Job version.') - parser.add_argument('--activeDeadlineSeconds', type=int, - default=None, - help='Specifies the duration (in seconds) since startTime during which the job can remain active before it is terminated. Must be a positive integer. This setting applies only to pods where restartPolicy is OnFailure or Always.') - parser.add_argument('--backoffLimit', type=int, - default=None, - help='Number of retries before marking this job as failed.') - parser.add_argument('--cleanPodPolicy', type=str, - default="Running", - help='Defines the policy for cleaning up pods after the Job completes.') - parser.add_argument('--ttlSecondsAfterFinished', type=int, - default=None, - help='Defines the TTL for cleaning up finished Jobs.') - parser.add_argument('--masterSpec', type=yamlOrJsonStr, - default={}, - help='Job master replicaSpecs.') - parser.add_argument('--workerSpec', type=yamlOrJsonStr, - default={}, - help='Job worker replicaSpecs.') - parser.add_argument('--deleteAfterDone', type=str2bool, - default=True, - help='When Job done, delete the Job automatically if it is True.') - parser.add_argument('--jobTimeoutMinutes', type=int, - default=60*24, - help='Time in minutes to wait for the Job to reach end') - - # Options that likely wont be used, but left here for future use - parser.add_argument('--jobGroup', type=str, - default="kubeflow.org", - help='Group for the CRD, ex: kubeflow.org') - parser.add_argument('--jobPlural', type=str, - default="pytorchjobs", # We could select a launcher here and populate these automatically - help='Plural name for the CRD, ex: pytorchjobs') - parser.add_argument('--kind', type=str, - default='PyTorchJob', - help='CRD kind.') - return parser - - -def main(args): - logging.getLogger(__name__).setLevel(logging.INFO) - logging.info('Generating job template.') - - jobSpec = KubeflowOrgV1PyTorchJobSpec( - pytorch_replica_specs={ - 'Master': args.masterSpec, - 'Worker': args.workerSpec, - }, - run_policy=KubeflowOrgV1RunPolicy( - active_deadline_seconds=args.activeDeadlineSeconds, - backoff_limit=args.backoffLimit, - clean_pod_policy=args.cleanPodPolicy, - ttl_seconds_after_finished=args.ttlSecondsAfterFinished, - ) - ) - - api_version = f"{args.jobGroup}/{args.version}" - - job = KubeflowOrgV1PyTorchJob( - api_version=api_version, - kind=args.kind, - metadata=k8s_client.V1ObjectMeta( - name=args.name, - namespace=args.namespace, - ), - spec=jobSpec, - ) - logging.info('Creating TrainingClient.') - - # remove one of these depending on where you are running this - config.load_incluster_config() - #config.load_kube_config() - - training_client = TrainingClient() - - logging.info(f"Creating PyTorchJob in namespace: {args.namespace}") - training_client.create_job(job, namespace=args.namespace) - - expected_conditions = ["Succeeded", "Failed"] - logging.info( - f'Monitoring job until status is any of {expected_conditions}.' - ) - training_client.wait_for_job_conditions( - name=args.name, - namespace=args.namespace, - job_kind=args.kind, - expected_conditions=set(expected_conditions), - timeout=int(datetime.timedelta(minutes=args.jobTimeoutMinutes).total_seconds()) - ) - if args.deleteAfterDone: - logging.info('Deleting job after completion.') - training_client.delete_job(args.name, args.namespace) - - -if __name__ == "__main__": - parser = get_arg_parser() - args = parser.parse_args() - main(args)