Skip to content

Commit

Permalink
feat(sdk): support dynamic machine type parameters in pipeline task s…
Browse files Browse the repository at this point in the history
…etters (kubeflow#11097)

* temp title: change title

Signed-off-by: KevinGrantLee <[email protected]>
  • Loading branch information
KevinGrantLee authored and chensun committed Sep 20, 2024
1 parent f4857ad commit 70aaf8a
Show file tree
Hide file tree
Showing 11 changed files with 608 additions and 130 deletions.
1 change: 1 addition & 0 deletions sdk/RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Current Version (in development)

## Features
* Support dynamic machine type parameters in pipeline task setters. [\#11097](https://github.com/kubeflow/pipelines/pull/11097)

## Breaking changes

Expand Down
24 changes: 12 additions & 12 deletions sdk/python/kfp/compiler/compiler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3382,31 +3382,31 @@ def simple_pipeline():
['exec-return-1']['container'])

self.assertEqual(
5, dict_format['deploymentSpec']['executors']['exec-return-1-2']
['container']['resources']['cpuLimit'])
'5', dict_format['deploymentSpec']['executors']['exec-return-1-2']
['container']['resources']['resourceCpuLimit'])
self.assertNotIn(
'memoryLimit', dict_format['deploymentSpec']['executors']
['exec-return-1-2']['container']['resources'])

self.assertEqual(
50, dict_format['deploymentSpec']['executors']['exec-return-1-3']
['container']['resources']['memoryLimit'])
'50G', dict_format['deploymentSpec']['executors']['exec-return-1-3']
['container']['resources']['resourceMemoryLimit'])
self.assertNotIn(
'cpuLimit', dict_format['deploymentSpec']['executors']
['exec-return-1-3']['container']['resources'])

self.assertEqual(
2, dict_format['deploymentSpec']['executors']['exec-return-1-4']
['container']['resources']['cpuRequest'])
'2', dict_format['deploymentSpec']['executors']['exec-return-1-4']
['container']['resources']['resourceCpuRequest'])
self.assertEqual(
5, dict_format['deploymentSpec']['executors']['exec-return-1-4']
['container']['resources']['cpuLimit'])
'5', dict_format['deploymentSpec']['executors']['exec-return-1-4']
['container']['resources']['resourceCpuLimit'])
self.assertEqual(
4, dict_format['deploymentSpec']['executors']['exec-return-1-4']
['container']['resources']['memoryRequest'])
'4G', dict_format['deploymentSpec']['executors']['exec-return-1-4']
['container']['resources']['resourceMemoryRequest'])
self.assertEqual(
50, dict_format['deploymentSpec']['executors']['exec-return-1-4']
['container']['resources']['memoryLimit'])
'50G', dict_format['deploymentSpec']['executors']['exec-return-1-4']
['container']['resources']['resourceMemoryLimit'])


class TestPlatformConfig(unittest.TestCase):
Expand Down
49 changes: 39 additions & 10 deletions sdk/python/kfp/compiler/pipeline_spec_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@ def build_task_spec_for_task(
pipeline_task_spec.retry_policy.CopyFrom(
task._task_spec.retry_policy.to_proto())

# Inject resource fields into inputs
if task.container_spec and task.container_spec.resources:
for key, val in task.container_spec.resources.__dict__.items():
if val and pipeline_channel.extract_pipeline_channels_from_any(val):
task.inputs[key] = val

for input_name, input_value in task.inputs.items():
# Since LoopParameterArgument and LoopArtifactArgument and LoopArgumentVariable are narrower
# types than PipelineParameterChannel, start with them.
Expand Down Expand Up @@ -607,6 +613,24 @@ def build_container_spec_for_task(
Returns:
A PipelineContainerSpec object for the task.
"""

def convert_to_placeholder(input_value: str) -> str:
"""Checks if input is a pipeline channel and if so, converts to
compiler injected input name."""
pipeline_channels = (
pipeline_channel.extract_pipeline_channels_from_any(input_value))
if pipeline_channels:
assert len(pipeline_channels) == 1
channel = pipeline_channels[0]
additional_input_name = (
compiler_utils.additional_input_name_for_pipeline_channel(
channel))
additional_input_placeholder = placeholders.InputValuePlaceholder(
additional_input_name)._to_string()
input_value = input_value.replace(channel.pattern,
additional_input_placeholder)
return input_value

container_spec = (
pipeline_spec_pb2.PipelineDeploymentConfig.PipelineContainerSpec(
image=task.container_spec.image,
Expand All @@ -620,23 +644,28 @@ def build_container_spec_for_task(

if task.container_spec.resources is not None:
if task.container_spec.resources.cpu_request is not None:
container_spec.resources.cpu_request = (
task.container_spec.resources.cpu_request)
container_spec.resources.resource_cpu_request = (
convert_to_placeholder(
task.container_spec.resources.cpu_request))
if task.container_spec.resources.cpu_limit is not None:
container_spec.resources.cpu_limit = (
task.container_spec.resources.cpu_limit)
container_spec.resources.resource_cpu_limit = (
convert_to_placeholder(task.container_spec.resources.cpu_limit))
if task.container_spec.resources.memory_request is not None:
container_spec.resources.memory_request = (
task.container_spec.resources.memory_request)
container_spec.resources.resource_memory_request = (
convert_to_placeholder(
task.container_spec.resources.memory_request))
if task.container_spec.resources.memory_limit is not None:
container_spec.resources.memory_limit = (
task.container_spec.resources.memory_limit)
container_spec.resources.resource_memory_limit = (
convert_to_placeholder(
task.container_spec.resources.memory_limit))
if task.container_spec.resources.accelerator_count is not None:
container_spec.resources.accelerator.CopyFrom(
pipeline_spec_pb2.PipelineDeploymentConfig.PipelineContainerSpec
.ResourceSpec.AcceleratorConfig(
type=task.container_spec.resources.accelerator_type,
count=task.container_spec.resources.accelerator_count,
resource_type=convert_to_placeholder(
task.container_spec.resources.accelerator_type),
resource_count=convert_to_placeholder(
task.container_spec.resources.accelerator_count),
))

return container_spec
Expand Down
116 changes: 57 additions & 59 deletions sdk/python/kfp/dsl/pipeline_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,9 @@ def _ensure_container_spec_exists(self) -> None:
f'{caller_method_name} can only be used on single-step components, not pipelines used as components, or special components like importers.'
)

def _validate_cpu_request_limit(self, cpu: str) -> float:
def _validate_cpu_request_limit(self, cpu: str) -> str:
"""Validates cpu request/limit string and converts to its numeric
value.
string value.
Args:
cpu: CPU requests or limits. This string should be a number or a
Expand All @@ -335,17 +335,22 @@ def _validate_cpu_request_limit(self, cpu: str) -> float:
ValueError if the cpu request/limit string value is invalid.
Returns:
The numeric value (float) of the cpu request/limit.
The numeric string of the cpu request/limit.
"""
if re.match(r'([0-9]*[.])?[0-9]+m?$', cpu) is None:
raise ValueError(
'Invalid cpu string. Should be float or integer, or integer'
' followed by "m".')

return float(cpu[:-1]) / 1000 if cpu.endswith('m') else float(cpu)
if isinstance(cpu, pipeline_channel.PipelineChannel):
cpu = str(cpu)
else:
if re.match(r'([0-9]*[.])?[0-9]+m?$', cpu) is None:
raise ValueError(
'Invalid cpu string. Should be float or integer, or integer'
' followed by "m".')
return cpu

@block_if_final()
def set_cpu_request(self, cpu: str) -> 'PipelineTask':
def set_cpu_request(
self,
cpu: Union[str,
pipeline_channel.PipelineChannel]) -> 'PipelineTask':
"""Sets CPU request (minimum) for the task.
Args:
Expand All @@ -370,7 +375,10 @@ def set_cpu_request(self, cpu: str) -> 'PipelineTask':
return self

@block_if_final()
def set_cpu_limit(self, cpu: str) -> 'PipelineTask':
def set_cpu_limit(
self,
cpu: Union[str,
pipeline_channel.PipelineChannel]) -> 'PipelineTask':
"""Sets CPU limit (maximum) for the task.
Args:
Expand All @@ -395,7 +403,9 @@ def set_cpu_limit(self, cpu: str) -> 'PipelineTask':
return self

@block_if_final()
def set_accelerator_limit(self, limit: int) -> 'PipelineTask':
def set_accelerator_limit(
self, limit: Union[int, str,
pipeline_channel.PipelineChannel]) -> 'PipelineTask':
"""Sets accelerator limit (maximum) for the task. Only applies if
accelerator type is also set via .set_accelerator_type().
Expand All @@ -406,11 +416,15 @@ def set_accelerator_limit(self, limit: int) -> 'PipelineTask':
Self return to allow chained setting calls.
"""
self._ensure_container_spec_exists()

if isinstance(limit, str):
if re.match(r'[1-9]\d*$', limit) is None:
raise ValueError(f'{"limit"!r} must be positive integer.')
limit = int(limit)
if isinstance(limit, pipeline_channel.PipelineChannel):
limit = str(limit)
else:
if isinstance(limit, int):
limit = str(limit)
if isinstance(limit, str) and re.match(r'^0$|^1$|^2$|^4$|^8$|^16$',
limit) is None:
raise ValueError(
f'{"limit"!r} must be one of 0, 1, 2, 4, 8, 16.')

if self.container_spec.resources is not None:
self.container_spec.resources.accelerator_count = limit
Expand Down Expand Up @@ -438,9 +452,9 @@ def set_gpu_limit(self, gpu: str) -> 'PipelineTask':
category=DeprecationWarning)
return self.set_accelerator_limit(gpu)

def _validate_memory_request_limit(self, memory: str) -> float:
def _validate_memory_request_limit(self, memory: str) -> str:
"""Validates memory request/limit string and converts to its numeric
value.
string value.
Args:
memory: Memory requests or limits. This string should be a number or
Expand All @@ -451,47 +465,24 @@ def _validate_memory_request_limit(self, memory: str) -> float:
ValueError if the memory request/limit string value is invalid.
Returns:
The numeric value (float) of the memory request/limit.
The numeric string value of the memory request/limit.
"""
if re.match(r'^[0-9]+(E|Ei|P|Pi|T|Ti|G|Gi|M|Mi|K|Ki){0,1}$',
memory) is None:
raise ValueError(
'Invalid memory string. Should be a number or a number '
'followed by one of "E", "Ei", "P", "Pi", "T", "Ti", "G", '
'"Gi", "M", "Mi", "K", "Ki".')

if memory.endswith('E'):
memory = float(memory[:-1]) * constants._E / constants._G
elif memory.endswith('Ei'):
memory = float(memory[:-2]) * constants._EI / constants._G
elif memory.endswith('P'):
memory = float(memory[:-1]) * constants._P / constants._G
elif memory.endswith('Pi'):
memory = float(memory[:-2]) * constants._PI / constants._G
elif memory.endswith('T'):
memory = float(memory[:-1]) * constants._T / constants._G
elif memory.endswith('Ti'):
memory = float(memory[:-2]) * constants._TI / constants._G
elif memory.endswith('G'):
memory = float(memory[:-1])
elif memory.endswith('Gi'):
memory = float(memory[:-2]) * constants._GI / constants._G
elif memory.endswith('M'):
memory = float(memory[:-1]) * constants._M / constants._G
elif memory.endswith('Mi'):
memory = float(memory[:-2]) * constants._MI / constants._G
elif memory.endswith('K'):
memory = float(memory[:-1]) * constants._K / constants._G
elif memory.endswith('Ki'):
memory = float(memory[:-2]) * constants._KI / constants._G
if isinstance(memory, pipeline_channel.PipelineChannel):
memory = str(memory)
else:
# By default interpret as a plain integer, in the unit of Bytes.
memory = float(memory) / constants._G

if re.match(r'^[0-9]+(E|Ei|P|Pi|T|Ti|G|Gi|M|Mi|K|Ki){0,1}$',
memory) is None:
raise ValueError(
'Invalid memory string. Should be a number or a number '
'followed by one of "E", "Ei", "P", "Pi", "T", "Ti", "G", '
'"Gi", "M", "Mi", "K", "Ki".')
return memory

@block_if_final()
def set_memory_request(self, memory: str) -> 'PipelineTask':
def set_memory_request(
self,
memory: Union[str,
pipeline_channel.PipelineChannel]) -> 'PipelineTask':
"""Sets memory request (minimum) for the task.
Args:
Expand All @@ -515,7 +506,10 @@ def set_memory_request(self, memory: str) -> 'PipelineTask':
return self

@block_if_final()
def set_memory_limit(self, memory: str) -> 'PipelineTask':
def set_memory_limit(
self,
memory: Union[str,
pipeline_channel.PipelineChannel]) -> 'PipelineTask':
"""Sets memory limit (maximum) for the task.
Args:
Expand Down Expand Up @@ -579,7 +573,9 @@ def add_node_selector_constraint(self, accelerator: str) -> 'PipelineTask':
return self.set_accelerator_type(accelerator)

@block_if_final()
def set_accelerator_type(self, accelerator: str) -> 'PipelineTask':
def set_accelerator_type(
self, accelerator: Union[str, pipeline_channel.PipelineChannel]
) -> 'PipelineTask':
"""Sets accelerator type to use when executing this task.
Args:
Expand All @@ -589,14 +585,16 @@ def set_accelerator_type(self, accelerator: str) -> 'PipelineTask':
Self return to allow chained setting calls.
"""
self._ensure_container_spec_exists()
if isinstance(accelerator, pipeline_channel.PipelineChannel):
accelerator = str(accelerator)

if self.container_spec.resources is not None:
self.container_spec.resources.accelerator_type = accelerator
if self.container_spec.resources.accelerator_count is None:
self.container_spec.resources.accelerator_count = 1
self.container_spec.resources.accelerator_count = '1'
else:
self.container_spec.resources = structures.ResourceSpec(
accelerator_count=1, accelerator_type=accelerator)
accelerator_count='1', accelerator_type=accelerator)

return self

Expand Down
Loading

0 comments on commit 70aaf8a

Please sign in to comment.