Skip to content

Commit

Permalink
Refactoring to remove use of image and add training-operator funcs
Browse files Browse the repository at this point in the history
Signed-off-by: Fiona Waters <[email protected]>
  • Loading branch information
Fiona-Waters committed Nov 4, 2024
1 parent 607e669 commit dbea49b
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 362 deletions.
23 changes: 0 additions & 23 deletions components/kubeflow/pytorch-launcher/Dockerfile

This file was deleted.

46 changes: 0 additions & 46 deletions components/kubeflow/pytorch-launcher/build_image.sh

This file was deleted.

5 changes: 0 additions & 5 deletions components/kubeflow/pytorch-launcher/requirements.txt

This file was deleted.

180 changes: 44 additions & 136 deletions components/kubeflow/pytorch-launcher/sample.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import kfp
from kfp import dsl
from typing import Optional
import uuid
import datetime
import logging
from kubernetes import config
from kubeflow.training import TrainingClient
from kubeflow.training.utils import utils


def get_current_namespace():
Expand All @@ -15,137 +19,52 @@ def get_current_namespace():
return current_namespace


@dsl.component()
def create_master_spec() -> dict:
# Define master spec
master = {
"replicas": 1,
"restartPolicy": "OnFailure",
"template": {
"metadata": {
"annotations": {
# See https://github.com/kubeflow/website/issues/2011
"sidecar.istio.io/inject": "false"
}
},
"spec": {
"containers": [
{
"args": [
"--backend",
"gloo",
],
"image": "public.ecr.aws/pytorch-samples/pytorch_dist_mnist:latest",
"name": "pytorch",
"resources": {
"requests": {
"memory": "4Gi",
"cpu": "2000m",
},
"limits": {
"memory": "4Gi",
"cpu": "2000m",
},
},
}
],
},
},
}

return master


@dsl.component
def create_worker_spec(
worker_num: int = 0
) -> dict:
"""
Creates pytorch-job worker spec
"""
worker = {}
if worker_num > 0:
worker = {
"replicas": worker_num,
"restartPolicy": "OnFailure",
"template": {
"metadata": {
"annotations": {
"sidecar.istio.io/inject": "false"
}
},
"spec": {
"containers": [
{
"args": [
"--backend",
"gloo",
],
"image": "public.ecr.aws/pytorch-samples/pytorch_dist_mnist:latest",
"name": "pytorch",
"resources": {
"requests": {
"memory": "4Gi",
"cpu": "2000m",
# Uncomment for GPU
# "nvidia.com/gpu": 1,
},
"limits": {
"memory": "4Gi",
"cpu": "2000m",
# Uncomment for GPU
# "nvidia.com/gpu": 1,
},
},
}
]
},
},
}

return worker

# container component description setting inputs and implementation
@dsl.container_component
@dsl.component(packages_to_install=['kubernetes<31,>=8.0.0', 'kubeflow-training>=1.8.0', 'retrying>=1.3.3'])
def pytorch_job_launcher(
name: str,
kind: str = "PyTorchJob",
namespace: str = "kubeflow",
version: str = 'v2',
master_spec: dict = {},
worker_spec: dict = {},
worker_replicas: int = 1,
job_timeout_minutes: int = 1440,
delete_after_done: bool = True,
clean_pod_policy: str = 'Running',
active_deadline_seconds: Optional[int] = None,
backoff_limit: Optional[int] = None,
ttl_seconds_after_finished: Optional[int] = None,
):
command_args = [
'--name', name,
'--kind', kind,
'--namespace', namespace,
'--version', version,
'--masterSpec', master_spec,
'--workerSpec', worker_spec,
'--jobTimeoutMinutes', job_timeout_minutes,
'--deleteAfterDone', delete_after_done,
'--cleanPodPolicy', clean_pod_policy,]
if active_deadline_seconds is not None and isinstance(active_deadline_seconds, int):
command_args.append(['--activeDeadlineSeconds', str(active_deadline_seconds)])
if backoff_limit is not None and isinstance(backoff_limit, int):
command_args.append(['--backoffLimit', str(backoff_limit)])
if ttl_seconds_after_finished is not None and isinstance(ttl_seconds_after_finished, int):
command_args.append(['--ttlSecondsAfterFinished', str(ttl_seconds_after_finished)])
args = ["--backend","gloo"]
resources = {"cpu": "2000m", "memory": "4Gi"} # add "nvidia.com/gpu": 1, for GPU
base_image="public.ecr.aws/pytorch-samples/pytorch_dist_mnist:latest"

container_spec = utils.get_container_spec(base_image=base_image, name="pytorch", resources=resources, args=args)
spec = utils.get_pod_template_spec(containers=[container_spec])
job_template = utils.get_pytorchjob_template(name=name, namespace=namespace, num_workers=worker_replicas, worker_pod_template_spec=spec, master_pod_template_spec=spec)


logging.getLogger(__name__).setLevel(logging.INFO)
logging.info('Generating job template.')

logging.info('Creating TrainingClient.')

# remove one of these depending on where you are running this
config.load_incluster_config()
#config.load_kube_config()

return dsl.ContainerSpec(
# In order to build the image that will be used in the container, use the build_image.sh script
# providing the required parameters. This will build the image and push it to your image repository.
# Example: export REPO_NAME="<repo-url>" export LAUNCHER_IMAGE_NAME="<image-name>" export TAG_NAME="<tag>" ./build_image.sh
image='<repo-name/launcher-image-name:tag-name>', # replace this with your image url
command=['python', '/ml/src/launch_pytorchjob.py'],
args=command_args
training_client = TrainingClient()

logging.info(f"Creating PyTorchJob in namespace: {namespace}")
training_client.create_job(job_template, namespace=namespace)

expected_conditions = ["Succeeded", "Failed"]
logging.info(
f'Monitoring job until status is any of {expected_conditions}.'
)
training_client.wait_for_job_conditions(
name=name,
namespace=namespace,
job_kind=kind,
expected_conditions=set(expected_conditions),
timeout=int(datetime.timedelta(minutes=job_timeout_minutes).total_seconds())
)
if delete_after_done:
logging.info('Deleting job after completion.')
training_client.delete_job(name, namespace)


@dsl.pipeline(
Expand All @@ -155,27 +74,16 @@ def pytorch_job_launcher(
def pytorch_job_pipeline(
kind: str = "PyTorchJob",
worker_replicas: int = 1,
ttl_seconds_after_finished: int = 3600,
job_timeout_minutes: int = 1440,
delete_after_done: bool = True,
clean_pod_policy: str ="Running"
):

namespace = get_current_namespace()
worker_spec = create_worker_spec(worker_num=worker_replicas)
master_spec = create_master_spec()

result = pytorch_job_launcher(
name=f"mnist-train-{uuid.uuid4().hex[:8]}",
kind=kind,
namespace=namespace,
version="v2",
worker_spec=worker_spec.output,
master_spec=master_spec.output,
ttl_seconds_after_finished=ttl_seconds_after_finished,
job_timeout_minutes=job_timeout_minutes,
delete_after_done=delete_after_done,
clean_pod_policy=clean_pod_policy,
version="v1",
worker_replicas=worker_replicas
)


Expand Down
13 changes: 0 additions & 13 deletions components/kubeflow/pytorch-launcher/src/__init__.py

This file was deleted.

Loading

0 comments on commit dbea49b

Please sign in to comment.