Skip to content

Commit

Permalink
Further refactoring of pytorch launcher for pipelines v2 and training…
Browse files Browse the repository at this point in the history
… operator

Signed-off-by: Fiona-Waters [email protected]
  • Loading branch information
Fiona-Waters committed Oct 7, 2024
1 parent 6db35fb commit d635a34
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 139 deletions.
2 changes: 1 addition & 1 deletion components/kubeflow/pytorch-launcher/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ RUN pip install --no-cache-dir -r requirements.txt

ADD build /ml

ENTRYPOINT ["python", "/ml/launch_pytorchjob.py"]
ENTRYPOINT ["python", "/ml/src/launch_pytorchjob.py"]
2 changes: 2 additions & 0 deletions components/kubeflow/pytorch-launcher/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
pyyaml
kubernetes
kubeflow-pytorchjob
kubeflow.training
retrying
str2bool
223 changes: 100 additions & 123 deletions components/kubeflow/pytorch-launcher/sample.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from typing import NamedTuple
from collections import namedtuple
import kfp
from kfp import dsl
from kfp import components
from typing import Optional


def get_current_namespace():
"""Returns current namespace if available, else kubeflow"""
Expand All @@ -14,12 +13,52 @@ def get_current_namespace():
current_namespace = "kubeflow"
return current_namespace

@dsl.component(base_image="python:slim")

@dsl.component()
def create_master_spec() -> dict:
# Define master spec
master = {
"replicas": 1,
"restartPolicy": "OnFailure",
"template": {
"metadata": {
"annotations": {
# See https://github.com/kubeflow/website/issues/2011
"sidecar.istio.io/inject": "false"
}
},
"spec": {
"containers": [
{
"args": [
"--backend",
"gloo",
],
"image": "public.ecr.aws/pytorch-samples/pytorch_dist_mnist:latest",
"name": "pytorch",
"resources": {
"requests": {
"memory": "4Gi",
"cpu": "2000m",
},
"limits": {
"memory": "4Gi",
"cpu": "2000m",
},
},
}
],
},
},
}

return master


@dsl.component
def create_worker_spec(
worker_num: int = 0
) -> NamedTuple(
"CreatWorkerSpec", [("worker_spec", dict)]
): # type: ignore
) -> dict:
"""
Creates pytorch-job worker spec
"""
Expand Down Expand Up @@ -63,158 +102,96 @@ def create_worker_spec(
},
}

worker_spec_output = namedtuple(
"MyWorkerOutput", ["worker_spec"]
)
return worker_spec_output(worker)
return worker

# container component description setting inputs and implementation
@dsl.container_component
def pytorch_job_launcher(
name: str,
namespace: str = 'kubeflow',
kind: str = "PyTorchJob",
namespace: str = "kubeflow",
version: str = 'v1',
master_spec: dict = {},
worker_spec: dict = {},
job_timeout_minutes: int = 1440,
delete_after_done: bool = True,
clean_pod_policy: str = 'Running',
active_deadline_seconds: int = None,
backoff_limit: int = None,
ttl_seconds_after_finished: int = None,
active_deadline_seconds: Optional[int] = None,
backoff_limit: Optional[int] = None,
ttl_seconds_after_finished: Optional[int] = None,
):
command_args = [
'--name', name,
'--kind', kind,
'--namespace', namespace,
'--version', version,
'--masterSpec', master_spec,
'--workerSpec', worker_spec,
'--jobTimeoutMinutes', job_timeout_minutes,
'--deleteAfterDone', delete_after_done,
'--cleanPodPolicy', clean_pod_policy,]
if active_deadline_seconds is not None and isinstance(active_deadline_seconds, int):
command_args.append(['--activeDeadlineSeconds', str(active_deadline_seconds)])
if backoff_limit is not None and isinstance(backoff_limit, int):
command_args.append(['--backoffLimit', str(backoff_limit)])
if ttl_seconds_after_finished is not None and isinstance(ttl_seconds_after_finished, int):
command_args.append(['--ttlSecondsAfterFinished', str(ttl_seconds_after_finished)])

return dsl.ContainerSpec(
image='quay.io/rh_ee_fwaters/kubeflow-pytorchjob-launcher:v2',
command=['python', '/ml/launch_pytorchjob.py'],
args=[
'--name', name,
'--namespace', namespace,
'--version', version,
'--masterSpec', master_spec,
'--workerSpec', worker_spec,
'--jobTimeoutMinutes', job_timeout_minutes,
'--deleteAfterDone', delete_after_done,
'--cleanPodPolicy', clean_pod_policy,
dsl.IfPresentPlaceholder(input_name='active_deadline_seconds', then=['--activeDeadlineSeconds', active_deadline_seconds]),
dsl.IfPresentPlaceholder(input_name='backoff_limit', then=['--backoffLimit', backoff_limit]),
dsl.IfPresentPlaceholder(input_name='ttl_seconds_after_finished', then=['--ttlSecondsAfterFinished', ttl_seconds_after_finished])
]
command=['python', '/ml/src/launch_pytorchjob.py'],
args=command_args
)


@dsl.pipeline(
name="launch-kubeflow-pytorchjob",
description="An example to launch pytorch.",
)
def pytorch_job_pipeline():
pytorch_job = pytorch_job_launcher(
name="sample-pytorch-job",
namespace="kubeflow",
version="v1",
master_spec={},
worker_spec={},
job_timeout_minutes=1440,
delete_after_done=True,
clean_pod_policy="Running"
)

@dsl.component()
def mnist_train(
namespace: str = get_current_namespace(),
def pytorch_job_pipeline(
kind: str = "PyTorchJob",
worker_replicas: int = 1,
ttl_seconds_after_finished: int = -1,
job_timeout_minutes: int = 600,
delete_after_done: bool = False,
ttl_seconds_after_finished: int = 3600,
job_timeout_minutes: int = 1440,
delete_after_done: bool = True,
clean_pod_policy: str ="Running"
):
pytorchjob_launcher_op = pytorch_job_pipeline

namespace = get_current_namespace()
worker_spec = create_worker_spec(worker_num=worker_replicas)
master_spec = create_master_spec()

master = {
"replicas": 1,
"restartPolicy": "OnFailure",
"template": {
"metadata": {
"annotations": {
# See https://github.com/kubeflow/website/issues/2011
"sidecar.istio.io/inject": "false"
}
},
"spec": {
"containers": [
{
# To override default command
# "command": [
# "python",
# "/opt/mnist/src/mnist.py"
# ],
"args": [
"--backend",
"gloo",
],
# Or, create your own image from
# https://github.com/kubeflow/pytorch-operator/tree/master/examples/mnist
"image": "public.ecr.aws/pytorch-samples/pytorch_dist_mnist:latest",
"name": "pytorch",
"resources": {
"requests": {
"memory": "4Gi",
"cpu": "2000m",
# Uncomment for GPU
# "nvidia.com/gpu": 1,
},
"limits": {
"memory": "4Gi",
"cpu": "2000m",
# Uncomment for GPU
# "nvidia.com/gpu": 1,
},
},
}
],
# If imagePullSecrets required
# "imagePullSecrets": [
# {"name": "image-pull-secret"},
# ],
},
},
}
create_worker_spec(worker_replicas)

# Launch and monitor the job with the launcher
pytorchjob_launcher_op(
# Note: name needs to be a unique pytorchjob name in the namespace.
# Using RUN_ID_PLACEHOLDER is one way of getting something unique.
name=f"name-{kfp.dsl.RUN_ID_PLACEHOLDER}",
result = pytorch_job_launcher(
name=f"mnist-train-{kfp.dsl.PIPELINE_JOB_ID_PLACEHOLDER}",
kind=kind,
namespace=namespace,
master_spec=master,
# pass worker_spec as a string because the JSON serializer will convert
# the placeholder for worker_replicas (which it sees as a string) into
# a quoted variable (eg a string) instead of an unquoted variable
# (number). If worker_replicas is quoted in the spec, it will break in
# k8s. See https://github.com/kubeflow/pipelines/issues/4776
worker_spec=create_worker_spec.outputs[
"worker_spec"
],
version="v2",
worker_spec=worker_spec.output,
master_spec=master_spec.output,
ttl_seconds_after_finished=ttl_seconds_after_finished,
job_timeout_minutes=job_timeout_minutes,
delete_after_done=delete_after_done,
clean_pod_policy=clean_pod_policy,
)


if __name__ == "__main__":
import kfp.compiler as compiler

pipeline_file = "test.tar.gz"
pipeline_file = "test.yaml"
print(
f"Compiling pipeline as {pipeline_file}"
)
compiler.Compiler().compile(
mnist_train, pipeline_file
pytorch_job_pipeline, pipeline_file
)

# # To run:
# client = kfp.Client()
# run = client.create_run_from_pipeline_package(
# pipeline_file,
# arguments={},
# run_name="test pytorchjob run"
# )
# print(f"Created run {run}")
# To run:
host="http://localhost:8080"
client = kfp.Client(host=host)
run = client.create_run_from_pipeline_package(
pipeline_file,
arguments={},
run_name="test pytorchjob run"
)
print(f"Created run {run}")
34 changes: 19 additions & 15 deletions components/kubeflow/pytorch-launcher/src/launch_pytorchjob.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import argparse
import datetime
from distutils.util import strtobool
from str2bool import str2bool
import logging
import yaml

from kubernetes import client as k8s_client
from kubernetes import config

import launch_crd
from kubeflow.pytorchjob import V1PyTorchJob as V1PyTorchJob_original
from kubeflow.pytorchjob import V1PyTorchJobSpec as V1PyTorchJobSpec_original
from kubeflow.training import TrainingClient
from kubeflow.training import KubeflowOrgV1RunPolicy
from kubeflow.training import KubeflowOrgV1PyTorchJob
from kubeflow.training import KubeflowOrgV1PyTorchJobSpec


def yamlOrJsonStr(string):
Expand All @@ -30,16 +30,16 @@ def get_current_namespace():


# Patch PyTorchJob APIs to align with k8s usage
class V1PyTorchJob(V1PyTorchJob_original):
class V1PyTorchJob(KubeflowOrgV1PyTorchJob):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.openapi_types = self.swagger_types
# self.openapi_types = self.swagger_types


class V1PyTorchJobSpec(V1PyTorchJobSpec_original):
class V1PyTorchJobSpec(KubeflowOrgV1PyTorchJobSpec):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.openapi_types = self.swagger_types
# self.openapi_types = self.swagger_types


def get_arg_parser():
Expand Down Expand Up @@ -71,7 +71,7 @@ def get_arg_parser():
parser.add_argument('--workerSpec', type=yamlOrJsonStr,
default={},
help='Job worker replicaSpecs.')
parser.add_argument('--deleteAfterDone', type=strtobool,
parser.add_argument('--deleteAfterDone', type=str2bool,
default=True,
help='When Job done, delete the Job automatically if it is True.')
parser.add_argument('--jobTimeoutMinutes', type=int,
Expand Down Expand Up @@ -100,7 +100,7 @@ def main(args):
'Master': args.masterSpec,
'Worker': args.workerSpec,
},
run_policy=k8s_client.V1RunPolicy(
run_policy=KubeflowOrgV1RunPolicy(
active_deadline_seconds=args.activeDeadlineSeconds,
backoff_limit=args.backoffLimit,
clean_pod_policy=args.cleanPodPolicy,
Expand All @@ -119,16 +119,20 @@ def main(args):
),
spec=jobSpec,
)

serialized_job = k8s_client.ApiClient().sanitize_for_serialization(job)
# print(job)
# serialized_job = k8s_client.ApiClient().sanitize_for_serialization(job)
# print(serialized_job)

logging.info('Creating TrainingClient.')

config.load_incluster_config()
# remove one of these depending on where you are running this
# config.load_incluster_config()
config.load_kube_config()

training_client = TrainingClient()

logging.info('Submitting PyTorchJob.')
training_client.create_job(serialized_job)
logging.info(f"Creating PyTorchJob in namespace: {args.namespace}")
training_client.create_job(job, namespace=args.namespace)

expected_conditions = ["Succeeded", "Failed"]
logging.info(
Expand Down

0 comments on commit d635a34

Please sign in to comment.