Skip to content

Commit

Permalink
Introduces PipelineConfig class with TTL configuration
Browse files Browse the repository at this point in the history
Signed-off-by: Ricardo M. Oliveira <[email protected]>
  • Loading branch information
rimolive committed Oct 22, 2024
1 parent dc757a4 commit 479f72e
Show file tree
Hide file tree
Showing 10 changed files with 1,584 additions and 1,293 deletions.
2,757 changes: 1,485 additions & 1,272 deletions api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go

Large diffs are not rendered by default.

8 changes: 7 additions & 1 deletion api/v2alpha1/pipeline_spec.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1080,7 +1080,7 @@ message PlatformSpec {
// Platform key to full platform config
map<string, SinglePlatformSpec> platforms = 1;

optional int32 ttlStrategysecondsAfterCompletion = 2;

}

message SinglePlatformSpec {
Expand All @@ -1093,8 +1093,14 @@ message SinglePlatformSpec {
// Arbitrary configuration, which will be defined by the platform
// protos/libraries.
google.protobuf.Struct config = 3;

PipelineConfig pipelineConfig = 4;
}

message PipelineConfig {
// Set TTL at pipeline-level
optional int32 pipelineTtl = 1;
}

message PlatformDeploymentConfig {
// Map of executor label to executor-level config
Expand Down
23 changes: 20 additions & 3 deletions backend/src/apiserver/template/v2_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"regexp"
"strings"

"github.com/golang/glog"
structpb "github.com/golang/protobuf/ptypes/struct"
"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
"github.com/kubeflow/pipelines/backend/src/apiserver/model"
Expand Down Expand Up @@ -77,9 +78,17 @@ func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Sche
}
}

var pipeline_options argocompiler.Options
if t.platformSpec.PipelineConfig.Ttl != nil {

Check failure on line 82 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

t.platformSpec.PipelineConfig undefined (type *pipelinespec.PlatformSpec has no field or method PipelineConfig)

Check failure on line 82 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

t.platformSpec.PipelineConfig undefined (type *pipelinespec.PlatformSpec has no field or method PipelineConfig)
glog.Info("Found pipeline config")
pipeline_options = argocompiler.Options{
TtlSeconds: *t.platformSpec.PipelineConfig.Ttl,

Check failure on line 85 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

t.platformSpec.PipelineConfig undefined (type *pipelinespec.PlatformSpec has no field or method PipelineConfig)

Check failure on line 85 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

t.platformSpec.PipelineConfig undefined (type *pipelinespec.PlatformSpec has no field or method PipelineConfig)
}
}

var obj interface{}
if util.CurrentExecutionType() == util.ArgoWorkflow {
obj, err = argocompiler.Compile(job, kubernetesSpec, nil)
obj, err = argocompiler.Compile(job, kubernetesSpec, &pipeline_options)
} else if util.CurrentExecutionType() == util.TektonPipelineRun {
obj, err = tektoncompiler.Compile(job, kubernetesSpec, &tektoncompiler.Options{LauncherImage: Launcher})
}
Expand Down Expand Up @@ -300,9 +309,17 @@ func (t *V2Spec) RunWorkflow(modelRun *model.Run, options RunWorkflowOptions) (u
}
}

var pipeline_options *argocompiler.Options
if t.platformSpec.PipelineConfig.Ttl != nil {

Check failure on line 313 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

t.platformSpec.PipelineConfig undefined (type *pipelinespec.PlatformSpec has no field or method PipelineConfig)

Check failure on line 313 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

t.platformSpec.PipelineConfig undefined (type *pipelinespec.PlatformSpec has no field or method PipelineConfig)
glog.Info("Found pipeline config")
pipeline_options = &argocompiler.Options{
TtlSeconds: *t.platformSpec.PipelineConfig.Ttl,

Check failure on line 316 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

t.platformSpec.PipelineConfig undefined (type *pipelinespec.PlatformSpec has no field or method PipelineConfig)

Check failure on line 316 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

t.platformSpec.PipelineConfig undefined (type *pipelinespec.PlatformSpec has no field or method PipelineConfig)
}
}

var obj interface{}
if util.CurrentExecutionType() == util.ArgoWorkflow {
obj, err = argocompiler.Compile(job, kubernetesSpec, nil)
obj, err = argocompiler.Compile(job, kubernetesSpec, pipeline_options)
} else if util.CurrentExecutionType() == util.TektonPipelineRun {
obj, err = tektoncompiler.Compile(job, kubernetesSpec, nil)
}
Expand Down Expand Up @@ -344,7 +361,7 @@ func IsPlatformSpecWithKubernetesConfig(template []byte) bool {
return false
}
_, ok := platformSpec.Platforms["kubernetes"]
return ok
return ok || platformSpec.PipelineConfig != nil

Check failure on line 364 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

platformSpec.PipelineConfig undefined (type pipelinespec.PlatformSpec has no field or method PipelineConfig)

Check failure on line 364 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

platformSpec.PipelineConfig undefined (type pipelinespec.PlatformSpec has no field or method PipelineConfig)
}

func (t *V2Spec) validatePipelineJobInputs(job *pipelinespec.PipelineJob) error {
Expand Down
13 changes: 13 additions & 0 deletions backend/src/v2/compiler/argocompiler/argo.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,13 @@ type Options struct {
// optional
PipelineRoot string
// TODO(Bobgy): add an option -- dev mode, ImagePullPolicy should only be Always in dev mode.
TtlSeconds int32
}

const (
pipeline_default_ttlSeconds = int32(30)
)

func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.SinglePlatformSpec, opts *Options) (*wfapi.Workflow, error) {
// clone jobArg, because we don't want to change it
jobMsg := proto.Clone(jobArg)
Expand Down Expand Up @@ -86,6 +91,11 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S
}
}

pipeline_ttlseconds := pipeline_default_ttlSeconds
if &opts.TtlSeconds != nil {
pipeline_ttlseconds = opts.TtlSeconds
}

// initialization
wf := &wfapi.Workflow{
TypeMeta: k8smeta.TypeMeta{
Expand Down Expand Up @@ -117,6 +127,9 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S
},
ServiceAccountName: "pipeline-runner",
Entrypoint: tmplEntrypoint,
TTLStrategy: &wfapi.TTLStrategy{
SecondsAfterCompletion: &pipeline_ttlseconds,
},
},
}
c := &workflowCompiler{
Expand Down
17 changes: 14 additions & 3 deletions sdk/python/kfp/compiler/pipeline_spec_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from kfp.dsl import component_factory
from kfp.dsl import for_loop
from kfp.dsl import pipeline_channel
from kfp.dsl import pipeline_config
from kfp.dsl import pipeline_context
from kfp.dsl import pipeline_task
from kfp.dsl import placeholders
Expand Down Expand Up @@ -1879,7 +1880,7 @@ def create_pipeline_spec(
pipeline: pipeline_context.Pipeline,
component_spec: structures.ComponentSpec,
pipeline_outputs: Optional[Any] = None,
ttl_strategy_seconds_after_completion: Optional[int] = None,
pipeline_config: Optional[pipeline_config.PipelineConfig] = None,
) -> Tuple[pipeline_spec_pb2.PipelineSpec, pipeline_spec_pb2.PlatformSpec]:
"""Creates a pipeline spec object.
Expand Down Expand Up @@ -1947,9 +1948,10 @@ def create_pipeline_spec(
group_name_to_group=group_name_to_group,
condition_channels=condition_channels,
)
import pdb; pdb.set_trace()

platform_spec = pipeline_spec_pb2.PlatformSpec()
platform_spec.ttlStrategysecondsAfterCompletion = ttl_strategy_seconds_after_completion
if pipeline_config is not None:
_merge_pipeline_config(pipelineConfig=pipeline_config, platformSpec=platform_spec)
for group in all_groups:
build_spec_by_group(
pipeline_spec=pipeline_spec,
Expand Down Expand Up @@ -2063,7 +2065,16 @@ def write_pipeline_spec_to_file(
else:
raise ValueError(
f'The output path {package_path} should end with ".yaml".')

def _merge_pipeline_config(pipelineConfig: pipeline_config.PipelineConfig, platformSpec: pipeline_spec_pb2.PlatformSpec):
pipeline_config_json = json_format.ParseDict(
{
"pipelineConfig": {
"pipelineTtl": pipelineConfig.get_ttl(),
}
}, platformSpec.platforms.get('kubernetes', {}))

return platformSpec

def extract_comments_from_pipeline_spec(pipeline_spec: dict,
pipeline_description: str) -> str:
Expand Down
8 changes: 4 additions & 4 deletions sdk/python/kfp/dsl/component_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from typing import Callable, List, Optional
import warnings

from kfp.dsl import component_factory
from kfp.dsl import component_factory, pipeline_config


def component(func: Optional[Callable] = None,
Expand All @@ -28,7 +28,7 @@ def component(func: Optional[Callable] = None,
output_component_file: Optional[str] = None,
install_kfp_package: bool = True,
kfp_package_path: Optional[str] = None,
ttl_active_seconds: Optional[int] = None):
pipeline_config: Optional[pipeline_config.PipelineConfig] = None):
"""Decorator for Python-function based components.
A KFP component can either be a lightweight component or a containerized
Expand Down Expand Up @@ -116,7 +116,7 @@ def pipeline():
output_component_file=output_component_file,
install_kfp_package=install_kfp_package,
kfp_package_path=kfp_package_path,
ttl_active_seconds=ttl_active_seconds)
pipeline_config=pipeline_config)

return component_factory.create_component_from_func(
func,
Expand All @@ -127,4 +127,4 @@ def pipeline():
output_component_file=output_component_file,
install_kfp_package=install_kfp_package,
kfp_package_path=kfp_package_path,
ttl_active_seconds=ttl_active_seconds)
pipeline_config=pipeline_config)
9 changes: 5 additions & 4 deletions sdk/python/kfp/dsl/component_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from kfp.dsl import container_component_artifact_channel
from kfp.dsl import container_component_class
from kfp.dsl import graph_component
from kfp.dsl import pipeline_config
from kfp.dsl import placeholders
from kfp.dsl import python_component
from kfp.dsl import structures
Expand Down Expand Up @@ -534,14 +535,14 @@ def create_component_from_func(
install_kfp_package: bool = True,
kfp_package_path: Optional[str] = None,
pip_trusted_hosts: Optional[List[str]] = None,
ttl_active_seconds: Optional[int] = None,
pipeline_config: Optional[pipeline_config.PipelineConfig] = None,
) -> python_component.PythonComponent:
"""Implementation for the @component decorator.
The decorator is defined under component_decorator.py. See the
decorator for the canonical documentation for this function.
"""

packages_to_install_command = _get_packages_to_install_command(
install_kfp_package=install_kfp_package,
target_image=target_image,
Expand Down Expand Up @@ -678,7 +679,7 @@ def create_graph_component_from_func(
name: Optional[str] = None,
description: Optional[str] = None,
display_name: Optional[str] = None,
ttl_strategy_seconds_after_completion: Optional[int] = None,
pipeline_config: Optional[pipeline_config.PipelineConfig] = None,
) -> graph_component.GraphComponent:
"""Implementation for the @pipeline decorator.
Expand All @@ -695,7 +696,7 @@ def create_graph_component_from_func(
component_spec=component_spec,
pipeline_func=func,
display_name=display_name,
ttl_strategy_seconds_after_completion=ttl_strategy_seconds_after_completion,
pipeline_config=pipeline_config,
)


Expand Down
5 changes: 3 additions & 2 deletions sdk/python/kfp/dsl/graph_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from kfp.compiler import pipeline_spec_builder as builder
from kfp.dsl import base_component
from kfp.dsl import pipeline_channel
from kfp.dsl import pipeline_config
from kfp.dsl import pipeline_context
from kfp.dsl import structures
from kfp.pipeline_spec import pipeline_spec_pb2
Expand All @@ -37,7 +38,7 @@ def __init__(
component_spec: structures.ComponentSpec,
pipeline_func: Callable,
display_name: Optional[str] = None,
ttl_strategy_seconds_after_completion: Optional[int] = None,
pipeline_config: Optional[pipeline_config.PipelineConfig] = None,
):
super().__init__(component_spec=component_spec)
self.pipeline_func = pipeline_func
Expand Down Expand Up @@ -70,7 +71,7 @@ def __init__(
pipeline=dsl_pipeline,
component_spec=self.component_spec,
pipeline_outputs=pipeline_outputs,
ttl_strategy_seconds_after_completion=ttl_strategy_seconds_after_completion,
pipeline_config=pipeline_config,
)

pipeline_root = getattr(pipeline_func, 'pipeline_root', None)
Expand Down
28 changes: 28 additions & 0 deletions sdk/python/kfp/dsl/pipeline_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Copyright 2024 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Pipeline-level config options."""


class PipelineConfig:
"""PipelineConfig contains pipeline-level config options."""

def __init__(self):
pass

# TODO add pipeline level configs
def set_ttl(self, ttl: int):
self.__ttl = ttl

def get_ttl(self) -> int:
return int(self.__ttl)
9 changes: 5 additions & 4 deletions sdk/python/kfp/dsl/pipeline_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from typing import Callable, Optional

from kfp.dsl import component_factory
from kfp.dsl import pipeline_config
from kfp.dsl import pipeline_task
from kfp.dsl import tasks_group
from kfp.dsl import utils
Expand All @@ -29,7 +30,7 @@ def pipeline(func: Optional[Callable] = None,
description: Optional[str] = None,
pipeline_root: Optional[str] = None,
display_name: Optional[str] = None,
ttl_active_seconds: Optional[int] = None) -> Callable:
pipeline_config: Optional[pipeline_config.PipelineConfig] = None) -> Callable:
"""Decorator used to construct a pipeline.
Example
Expand All @@ -51,7 +52,7 @@ def my_pipeline(a: str, b: int):
pipeline_root: The root directory from which to read input and output
parameters and artifacts.
display_name: A human-readable name for the pipeline.
ttl_active_seconds: Delete completed workflows after a set time.
pipeline_config: Pipeline-level configuration.
"""
if func is None:
return functools.partial(
Expand All @@ -60,7 +61,7 @@ def my_pipeline(a: str, b: int):
description=description,
pipeline_root=pipeline_root,
display_name=display_name,
ttl_active_seconds=ttl_active_seconds,
pipeline_config=pipeline_config,
)

if pipeline_root:
Expand All @@ -71,7 +72,7 @@ def my_pipeline(a: str, b: int):
name=name,
description=description,
display_name=display_name,
ttl_strategy_seconds_after_completion=ttl_active_seconds,
pipeline_config=pipeline_config,
)


Expand Down

0 comments on commit 479f72e

Please sign in to comment.