Skip to content

Commit

Permalink
VAI Provider - Namespaced custom resources for pipelines (scheduled a…
Browse files Browse the repository at this point in the history
…nd one off) (#326)

* Change to PipelineDefinition Name to be NamespacedName

* Change compiler to handle namespaced pipeline names

* Change pipeline storage location following namespaced name change

* Change one off runs to retrieve pipeline name from job spec

* Change to using suffix from run definition name rather than version as realised version is not unique

* Change run schedule definition to use namespace
  • Loading branch information
grahamia authored May 20, 2024
1 parent b1d074a commit c009d4a
Show file tree
Hide file tree
Showing 16 changed files with 249 additions and 61 deletions.
4 changes: 2 additions & 2 deletions argo/kfp-compiler/acceptance/pipeline_conf.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
name: test
name: namespace/test
image: test-pipeline
tfxComponents: pipeline.create_components
env:
- name: foo
value: bar
beamArgs:
- name: anArg
value: aValue
value: aValue
1 change: 1 addition & 0 deletions argo/kfp-compiler/acceptance/test_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def test_cli_v2():
f = open(output_file_path, "r")
pipeline = yaml.safe_load(f.read())
assert pipeline['pipelineSpec']['schemaVersion'] == '2.0.0'
assert pipeline['pipelineSpec']['pipelineInfo']['name'] == "namespace-test"


def test_failure():
Expand Down
12 changes: 9 additions & 3 deletions argo/kfp-compiler/kfp_compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ def load_fn(tfx_components: str, env: list):
return fn


def sanitise_namespaced_pipeline_name(namespaced_name: str) -> str:
return namespaced_name.replace("/", "-")


@click.command()
@click.option('--pipeline_config', help='Pipeline configuration in yaml format', required=True)
@click.option('--provider_config', help='Provider configuration in yaml format', required=True)
Expand All @@ -78,9 +82,11 @@ def compile(pipeline_config: str, provider_config: str, output_file: str):
pipeline_config_contents = yaml.safe_load(pipeline_stream)
provider_config_contents = yaml.safe_load(provider_stream)

click.secho(f'Compiling with pipeline: {pipeline_config_contents} and provider {provider_config_contents} ', fg='green')
click.secho(f'Compiling with pipeline: {pipeline_config_contents} and provider {provider_config_contents} ',
fg='green')

pipeline_root, serving_model_directory, temp_location = pipeline_paths_for_config(pipeline_config_contents, provider_config_contents)
pipeline_root, serving_model_directory, temp_location = pipeline_paths_for_config(pipeline_config_contents,
provider_config_contents)

beam_args = provider_config_contents.get('defaultBeamArgs', [])
beam_args.extend(pipeline_config_contents.get('beamArgs', []))
Expand All @@ -94,7 +100,7 @@ def compile(pipeline_config: str, provider_config: str, output_file: str):

compile_fn(pipeline_config_contents, output_file).run(
pipeline.Pipeline(
pipeline_name=pipeline_config_contents['name'],
pipeline_name=sanitise_namespaced_pipeline_name(pipeline_config_contents['name']),
pipeline_root=pipeline_root,
components=expanded_components,
enable_cache=False,
Expand Down
7 changes: 7 additions & 0 deletions argo/kfp-compiler/tests/test_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,10 @@ def test_pipeline_paths_for_config():
assert pipeline_root == "pipeline_root/pipeline"
assert serving_model_directory == "pipeline_root/pipeline/serving"
assert temp_directory == "pipeline_root/pipeline/tmp"


def test_sanitise_namespaced_pipeline_name():
assert compiler.sanitise_namespaced_pipeline_name("pipeline-name") == "pipeline-name"
assert compiler.sanitise_namespaced_pipeline_name("/pipeline-name") == "-pipeline-name"
assert compiler.sanitise_namespaced_pipeline_name("mlops/pipeline-name") == "mlops-pipeline-name"
assert compiler.sanitise_namespaced_pipeline_name("") == ""
14 changes: 7 additions & 7 deletions argo/providers/base/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import (
)

type PipelineDefinition struct {
Name string `yaml:"name"`
Version string `yaml:"version"`
Image string `yaml:"image"`
TfxComponents string `yaml:"tfxComponents"`
Env []apis.NamedValue `yaml:"env"`
BeamArgs []apis.NamedValue `yaml:"beamArgs"`
Name common.NamespacedName `yaml:"name"`
Version string `yaml:"version"`
Image string `yaml:"image"`
TfxComponents string `yaml:"tfxComponents"`
Env []apis.NamedValue `yaml:"env"`
BeamArgs []apis.NamedValue `yaml:"beamArgs"`
}

type ExperimentDefinition struct {
Expand All @@ -24,7 +24,7 @@ type ExperimentDefinition struct {
}

type RunScheduleDefinition struct {
Name string `yaml:"name"`
Name common.NamespacedName `yaml:"name"`
Version string `yaml:"version"`
PipelineName common.NamespacedName `yaml:"pipelineName"`
PipelineVersion string `yaml:"pipelineVersion"`
Expand Down
4 changes: 2 additions & 2 deletions argo/providers/kfp/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (kfpp KfpProvider) CreatePipeline(ctx context.Context, providerConfig KfpPr
}

result, err := pipelineUploadService.UploadPipeline(&pipeline_upload_service.UploadPipelineParams{
Name: &pipelineDefinition.Name,
Name: &pipelineDefinition.Name.Name,
Uploadfile: runtime.NamedReader(pipelineFileName, reader),
Context: ctx,
}, nil)
Expand Down Expand Up @@ -234,7 +234,7 @@ func (kfpp KfpProvider) CreateRunSchedule(ctx context.Context, providerConfig Kf
Parameters: jobParameters,
},
Description: string(runScheduleAsDescription),
Name: runScheduleDefinition.Name,
Name: runScheduleDefinition.Name.Name,
MaxConcurrency: 1,
Enabled: true,
NoCatchup: true,
Expand Down
15 changes: 8 additions & 7 deletions argo/providers/stub/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"github.com/argoproj/argo-events/eventsources/sources/generic"
"github.com/sky-uk/kfp-operator/argo/common"
"github.com/sky-uk/kfp-operator/argo/providers/base"
)

Expand All @@ -14,8 +15,8 @@ type StubProviderConfig struct {
}

type ResourceDefinition struct {
Name string `yaml:"name"`
Version string `yaml:"version"`
Name common.NamespacedName `yaml:"name"`
Version string `yaml:"version"`
}

type ExpectedInput struct {
Expand Down Expand Up @@ -71,31 +72,31 @@ func (s StubProvider) DeletePipeline(_ context.Context, providerConfig StubProvi
}

func (s StubProvider) CreateRun(_ context.Context, providerConfig StubProviderConfig, resourceDefinition base.RunDefinition) (string, error) {
return verifyCreateCall(providerConfig, ResourceDefinition{resourceDefinition.Name.Name, resourceDefinition.Version})
return verifyCreateCall(providerConfig, ResourceDefinition{resourceDefinition.Name, resourceDefinition.Version})
}

func (s StubProvider) DeleteRun(_ context.Context, providerConfig StubProviderConfig, id string) error {
return verifyDeleteCall(providerConfig, id)
}

func (s StubProvider) CreateRunSchedule(_ context.Context, providerConfig StubProviderConfig, resourceDefinition base.RunScheduleDefinition) (string, error) {
return verifyCreateCall(providerConfig, ResourceDefinition{resourceDefinition.Name, resourceDefinition.Version})
return verifyCreateCall(providerConfig, ResourceDefinition{Name: resourceDefinition.Name, Version: resourceDefinition.Version})
}

func (s StubProvider) UpdateRunSchedule(_ context.Context, providerConfig StubProviderConfig, resourceDefinition base.RunScheduleDefinition, id string) (string, error) {
return verifyUpdateCall(providerConfig, ResourceDefinition{resourceDefinition.Name, resourceDefinition.Version}, id)
return verifyUpdateCall(providerConfig, ResourceDefinition{Name: resourceDefinition.Name, Version: resourceDefinition.Version}, id)
}

func (s StubProvider) DeleteRunSchedule(_ context.Context, providerConfig StubProviderConfig, id string) error {
return verifyDeleteCall(providerConfig, id)
}

func (s StubProvider) CreateExperiment(_ context.Context, providerConfig StubProviderConfig, resourceDefinition base.ExperimentDefinition) (string, error) {
return verifyCreateCall(providerConfig, ResourceDefinition{resourceDefinition.Name, resourceDefinition.Version})
return verifyCreateCall(providerConfig, ResourceDefinition{common.NamespacedName{Name: resourceDefinition.Name}, resourceDefinition.Version})
}

func (s StubProvider) UpdateExperiment(_ context.Context, providerConfig StubProviderConfig, resourceDefinition base.ExperimentDefinition, id string) (string, error) {
return verifyUpdateCall(providerConfig, ResourceDefinition{resourceDefinition.Name, resourceDefinition.Version}, id)
return verifyUpdateCall(providerConfig, ResourceDefinition{common.NamespacedName{Name: resourceDefinition.Name}, resourceDefinition.Version}, id)
}

func (s StubProvider) DeleteExperiment(_ context.Context, providerConfig StubProviderConfig, id string) error {
Expand Down
22 changes: 13 additions & 9 deletions argo/providers/vai/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package vai

import (
"fmt"
"strings"
"github.com/sky-uk/kfp-operator/argo/common"
)

type VAIProviderConfig struct {
Expand All @@ -28,16 +28,20 @@ func (vaipc VAIProviderConfig) pipelineJobName(name string) string {
return fmt.Sprintf("%s/pipelineJobs/%s", vaipc.parent(), name)
}

func (vaipc VAIProviderConfig) pipelineStorageObject(pipelineName string, pipelineVersion string) string {
return fmt.Sprintf("%s/%s", pipelineName, pipelineVersion)
}

func (vaipc VAIProviderConfig) gcsUri(bucket string, pathSegments ...string) string {
return fmt.Sprintf("gs://%s/%s", bucket, strings.Join(pathSegments, "/"))
func (vaipc VAIProviderConfig) pipelineStorageObject(pipelineName common.NamespacedName, pipelineVersion string) (string, error) {
namespaceName, err := pipelineName.String()
if err != nil {
return "", err
}
return fmt.Sprintf("%s/%s", namespaceName, pipelineVersion), nil
}

func (vaipc VAIProviderConfig) pipelineUri(pipelineName string, pipelineVersion string) string {
return vaipc.gcsUri(vaipc.PipelineBucket, vaipc.pipelineStorageObject(pipelineName, pipelineVersion))
func (vaipc VAIProviderConfig) pipelineUri(pipelineName common.NamespacedName, pipelineVersion string) (string, error) {
pipelineUri, err := vaipc.pipelineStorageObject(pipelineName, pipelineVersion)
if err != nil {
return "", err
}
return fmt.Sprintf("gs://%s/%s", vaipc.PipelineBucket, pipelineUri), nil
}

func (vaipc VAIProviderConfig) getMaxConcurrentRunCountOrDefault() int64 {
Expand Down
48 changes: 48 additions & 0 deletions argo/providers/vai/config_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,52 @@ var _ = Context("VAI Config", func() {
Entry("", -common.RandomInt64(), true),
Entry("", common.RandomInt64()+1, false),
)

DescribeTable("pipelineStorageObject", func(pipelineName common.NamespacedName, pipelineVersion string, expectedStorageObject string) {
storageObject, err := config.pipelineStorageObject(pipelineName, pipelineVersion)
if expectedStorageObject == "" {
Expect(err).To(HaveOccurred())
} else {
Expect(err).NotTo(HaveOccurred())
Expect(storageObject).To(Equal(expectedStorageObject))
}
},
Entry("", common.NamespacedName{
Name: "myName",
Namespace: "myNamespace",
}, "version", "myNamespace/myName/version"),
Entry("", common.NamespacedName{
Name: "myName",
Namespace: "",
}, "version", "myName/version"),
Entry("", common.NamespacedName{
Name: "",
Namespace: "myNamespace",
}, "version", ""),
)

DescribeTable("pipelineUri", func(bucket string, pipelineName common.NamespacedName, pipelineVersion string, expectedStorageObject string) {
config.PipelineBucket = bucket

storageObject, err := config.pipelineUri(pipelineName, pipelineVersion)
if expectedStorageObject == "" {
Expect(err).To(HaveOccurred())
} else {
Expect(err).NotTo(HaveOccurred())
Expect(storageObject).To(Equal(expectedStorageObject))
}
},
Entry("", "bucket", common.NamespacedName{
Name: "myName",
Namespace: "myNamespace",
}, "version", "gs://bucket/myNamespace/myName/version"),
Entry("", "", common.NamespacedName{
Name: "myName",
Namespace: "myNamespace",
}, "version", "gs:///myNamespace/myName/version"),
Entry("", "bucket", common.NamespacedName{
Name: "",
Namespace: "myNamespace",
}, "version", ""),
)
})
Loading

0 comments on commit c009d4a

Please sign in to comment.