Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kfp pipeline for running a pytorch job #14

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions training/component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
description: Kubeflow PyTorchJob launcher
inputs:
- {name: name, type: String, description: 'PyTorchJob name.'}
- {name: namespace, type: String, default: kubeflow, description: 'PyTorchJob namespace (likely your current namespace).'}
- {name: version, type: String, default: v1, description: 'PyTorchJob version.'}
- {name: master_spec, type: JsonObject, default: '{}', description: 'PyTorchJob Master replicaSpecs.'}
- {name: worker_spec, type: JsonObject, default: '{}', description: 'PyTorchJob Worker replicaSpecs.'}
- {name: job_timeout_minutes, type: Integer, default: 1440, description: 'Time in minutes to wait for the job to complete.'}
- {name: delete_after_done, type: Boolean, default: 'True' , description: 'Whether to delete the job after it is finished.'}
- {name: clean_pod_policy, type: String, default: Running, description: 'Defines the policy for cleaning up pods after the PyTorchJob completes.'}
- {name: active_deadline_seconds, type: Integer, optional: true, description: 'Specifies the duration (in seconds) since startTime during which the job can remain active before it is terminated. Must be a positive integer. This setting applies only to pods where restartPolicy is OnFailure or Always.'}
- {name: backoff_limit, type: Integer, optional: true, description: 'Number of retries before marking this job as failed.'}
- {name: ttl_seconds_after_finished, type: Integer, optional: true, description: 'Defines the TTL for cleaning up finished PyTorchJobs.'}
implementation:
container:
image: cascribner/kubeflow-pytorchjob-launcher:v1
command: [python, /ml/launch_pytorchjob.py]
args:
- --name
- {inputValue: name}
- --namespace
- {inputValue: namespace}
- --version
- {inputValue: version}
- --masterSpec
- {inputValue: master_spec}
- --workerSpec
- {inputValue: worker_spec}
- --jobTimeoutMinutes
- {inputValue: job_timeout_minutes}
- --deleteAfterDone
- {inputValue: delete_after_done}
- --cleanPodPolicy
- {inputValue: clean_pod_policy}
- --activeDeadlineSeconds
- {inputValue: active_deadline_seconds}
- --backoffLimit
- {inputValue: backoff_limit}
- --ttlSecondsAfterFinished
- {inputValue: ttl_seconds_after_finished}
127 changes: 127 additions & 0 deletions training/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import kfp.dsl as dsl
from kfp import components
import kfp.compiler as compiler


@dsl.pipeline(name="launch-kubeflow-pytorchjob",
description="An example to launch pytorch.")
def ilab_train(
namespace: str = "mcliffor",
worker_replicas: int = 1,
ttl_seconds_after_finished: int = -1,
job_timeout_minutes: int = 600,
delete_after_done: bool = False):

pytorchjob_launcher_op = components.load_component_from_file("component.yaml")

master = {
"replicas": 1,
"restartPolicy": "OnFailure",
"template": {
"metadata": {
"annotations": {
# See https://github.com/kubeflow/website/issues/2011
"sidecar.istio.io/inject": "false"
}
},
"spec": {
"containers": [
{
# To override default command
"command": [
'/bin/bash',
'-c',
'--'
],
"args": [
"python3.11 -u run.py"
],
# Or, create your own image from
# https://github.com/kubeflow/pytorch-operator/tree/master/examples/mnist
"image": "quay.io/michaelclifford/test-train:0.0.11",
"name": "pytorch",
"resources": {
"requests": {
"memory": "8Gi",
"cpu": "2000m",
# Uncomment for GPU
"nvidia.com/gpu": 1,
},
"limits": {
"memory": "8Gi",
"cpu": "2000m",
# Uncomment for GPU
"nvidia.com/gpu": 1,
},
Comment on lines +41 to +55
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we parametrize this?

},
}
],
# If imagePullSecrets required
# "imagePullSecrets": [
# {"name": "image-pull-secret"},
# ],
},
},
}

worker = {}
if worker_replicas > 0:
worker = {
"replicas": worker_replicas,
"restartPolicy": "OnFailure",
"template": {
"metadata": {
"annotations": {
"sidecar.istio.io/inject": "false"
}
},
"spec": {
"containers": [
{ "command": [
'/bin/bash',
'-c',
'--'
],
"args": [
"python3.11 -u run.py"
],
"image": "quay.io/michaelclifford/test-train:0.0.11",
"name": "pytorch",
"resources": {
"requests": {
"memory": "8Gi",
"cpu": "2000m",
# Uncomment for GPU
"nvidia.com/gpu": 1,
},
"limits": {
"memory": "8Gi",
"cpu": "2000m",
# Uncomment for GPU
"nvidia.com/gpu": 1,
},
},
}
]
},
},
}

# Launch and monitor the job with the launcher
pytorchjob_launcher_op(
name="pytorch-job",
namespace=namespace,
master_spec=master,
worker_spec = worker,
ttl_seconds_after_finished=ttl_seconds_after_finished,
job_timeout_minutes=job_timeout_minutes,
delete_after_done=delete_after_done,
active_deadline_seconds=100,
backoff_limit=1)


if __name__ == "__main__":
pipeline_file = "pipeline.yaml"
print(f"Compiling pipeline as {pipeline_file}")
compiler.Compiler().compile(ilab_train,
pipeline_file)
198 changes: 198 additions & 0 deletions training/pipeline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
# PIPELINE DEFINITION
# Name: launch-kubeflow-pytorchjob
# Description: An example to launch pytorch.
# Inputs:
# delete_after_done: bool [Default: False]
# job_timeout_minutes: int [Default: 600.0]
# namespace: str [Default: 'mcliffor']
# ttl_seconds_after_finished: int [Default: -1.0]
# worker_replicas: int [Default: 1.0]
components:
comp-name:
executorLabel: exec-name
inputDefinitions:
parameters:
active_deadline_seconds:
isOptional: true
parameterType: NUMBER_INTEGER
backoff_limit:
isOptional: true
parameterType: NUMBER_INTEGER
clean_pod_policy:
defaultValue: Running
isOptional: true
parameterType: STRING
delete_after_done:
defaultValue: true
isOptional: true
parameterType: BOOLEAN
job_timeout_minutes:
defaultValue: 1440.0
isOptional: true
parameterType: NUMBER_INTEGER
master_spec:
defaultValue: {}
isOptional: true
parameterType: STRUCT
name:
parameterType: STRING
namespace:
defaultValue: kubeflow
isOptional: true
parameterType: STRING
ttl_seconds_after_finished:
isOptional: true
parameterType: NUMBER_INTEGER
version:
defaultValue: v1
isOptional: true
parameterType: STRING
worker_spec:
defaultValue: {}
isOptional: true
parameterType: STRUCT
deploymentSpec:
executors:
exec-name:
container:
args:
- --name
- '{{$.inputs.parameters[''name'']}}'
- --namespace
- '{{$.inputs.parameters[''namespace'']}}'
- --version
- '{{$.inputs.parameters[''version'']}}'
- --masterSpec
- '{{$.inputs.parameters[''master_spec'']}}'
- --workerSpec
- '{{$.inputs.parameters[''worker_spec'']}}'
- --jobTimeoutMinutes
- '{{$.inputs.parameters[''job_timeout_minutes'']}}'
- --deleteAfterDone
- '{{$.inputs.parameters[''delete_after_done'']}}'
- --cleanPodPolicy
- '{{$.inputs.parameters[''clean_pod_policy'']}}'
- --activeDeadlineSeconds
- '{{$.inputs.parameters[''active_deadline_seconds'']}}'
- --backoffLimit
- '{{$.inputs.parameters[''backoff_limit'']}}'
- --ttlSecondsAfterFinished
- '{{$.inputs.parameters[''ttl_seconds_after_finished'']}}'
command:
- python
- /ml/launch_pytorchjob.py
image: cascribner/kubeflow-pytorchjob-launcher:v1
pipelineInfo:
description: An example to launch pytorch.
name: launch-kubeflow-pytorchjob
root:
dag:
tasks:
name:
cachingOptions:
enableCache: true
componentRef:
name: comp-name
inputs:
parameters:
active_deadline_seconds:
runtimeValue:
constant: 100.0
backoff_limit:
runtimeValue:
constant: 1.0
delete_after_done:
componentInputParameter: delete_after_done
job_timeout_minutes:
componentInputParameter: job_timeout_minutes
master_spec:
runtimeValue:
constant:
replicas: 1.0
restartPolicy: OnFailure
template:
metadata:
annotations:
sidecar.istio.io/inject: 'false'
spec:
containers:
- args:
- python3.11 -u run.py
command:
- /bin/bash
- -c
- --
image: quay.io/michaelclifford/test-train:0.0.11
name: pytorch
resources:
limits:
cpu: 2000m
memory: 8Gi
nvidia.com/gpu: 1.0
requests:
cpu: 2000m
memory: 8Gi
nvidia.com/gpu: 1.0
name:
runtimeValue:
constant: pytorch-job
namespace:
componentInputParameter: namespace
pipelinechannel--worker_replicas:
componentInputParameter: worker_replicas
ttl_seconds_after_finished:
componentInputParameter: ttl_seconds_after_finished
worker_spec:
runtimeValue:
constant:
replicas: '{{$.inputs.parameters[''pipelinechannel--worker_replicas'']}}'
restartPolicy: OnFailure
template:
metadata:
annotations:
sidecar.istio.io/inject: 'false'
spec:
containers:
- args:
- python3.11 -u run.py
command:
- /bin/bash
- -c
- --
image: quay.io/michaelclifford/test-train:0.0.11
name: pytorch
resources:
limits:
cpu: 2000m
memory: 8Gi
nvidia.com/gpu: 1.0
requests:
cpu: 2000m
memory: 8Gi
nvidia.com/gpu: 1.0
taskInfo:
name: name
inputDefinitions:
parameters:
delete_after_done:
defaultValue: false
isOptional: true
parameterType: BOOLEAN
job_timeout_minutes:
defaultValue: 600.0
isOptional: true
parameterType: NUMBER_INTEGER
namespace:
defaultValue: mcliffor
isOptional: true
parameterType: STRING
ttl_seconds_after_finished:
defaultValue: -1.0
isOptional: true
parameterType: NUMBER_INTEGER
worker_replicas:
defaultValue: 1.0
isOptional: true
parameterType: NUMBER_INTEGER
schemaVersion: 2.1.0
sdkVersion: kfp-2.8.0