From 978065002c89cc3e8de88aa4790ceee1cf8e98ab Mon Sep 17 00:00:00 2001 From: Giulio Frasca Date: Mon, 13 May 2024 18:33:21 -0400 Subject: [PATCH] test: Add argoCompiler test case to validate individual parallel-limited tasks Signed-off-by: Giulio Frasca --- .../src/v2/compiler/argocompiler/argo_test.go | 5 + .../testdata/multiple_parallel_loops.yaml | 589 ++++++++++++++++++ .../testdata/multiple_parallel_loops.json | 284 +++++++++ 3 files changed, 878 insertions(+) create mode 100755 backend/src/v2/compiler/argocompiler/testdata/multiple_parallel_loops.yaml create mode 100644 backend/src/v2/compiler/testdata/multiple_parallel_loops.json diff --git a/backend/src/v2/compiler/argocompiler/argo_test.go b/backend/src/v2/compiler/argocompiler/argo_test.go index 6c92e54574c2..cd802b5f38b9 100644 --- a/backend/src/v2/compiler/argocompiler/argo_test.go +++ b/backend/src/v2/compiler/argocompiler/argo_test.go @@ -47,6 +47,11 @@ func Test_argo_compiler(t *testing.T) { platformSpecPath: "", argoYAMLPath: "testdata/importer.yaml", }, + { + jobPath: "../testdata/multiple_parallel_loops.json", + platformSpecPath: "", + argoYAMLPath: "testdata/multiple_parallel_loops.yaml", + }, { jobPath: "../testdata/create_mount_delete_dynamic_pvc.json", platformSpecPath: "../testdata/create_mount_delete_dynamic_pvc_platform.json", diff --git a/backend/src/v2/compiler/argocompiler/testdata/multiple_parallel_loops.yaml b/backend/src/v2/compiler/argocompiler/testdata/multiple_parallel_loops.yaml new file mode 100755 index 000000000000..982051b2ad31 --- /dev/null +++ b/backend/src/v2/compiler/argocompiler/testdata/multiple_parallel_loops.yaml @@ -0,0 +1,589 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + annotations: + pipelines.kubeflow.org/components-comp-for-loop-2: '{"dag":{"tasks":{"print-op":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-print-op"},"inputs":{"parameters":{"s":{"componentInputParameter":"pipelinechannel--loop-item-param-1","parameterExpressionSelector":"parseJson(string_value)[\"A_a\"]"}}},"taskInfo":{"name":"print-op"}},"print-op-2":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-print-op-2"},"inputs":{"parameters":{"s":{"componentInputParameter":"pipelinechannel--loop-item-param-1","parameterExpressionSelector":"parseJson(string_value)[\"B_b\"]"}}},"taskInfo":{"name":"print-op-2"}}}},"inputDefinitions":{"parameters":{"pipelinechannel--loop-item-param-1":{"parameterType":"STRUCT"}}}}' + pipelines.kubeflow.org/components-comp-for-loop-4: '{"dag":{"tasks":{"print-op-3":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-print-op-3"},"inputs":{"parameters":{"s":{"componentInputParameter":"pipelinechannel--loop-item-param-3","parameterExpressionSelector":"parseJson(string_value)[\"A_a\"]"}}},"taskInfo":{"name":"print-op-3"}},"print-op-4":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-print-op-4"},"inputs":{"parameters":{"s":{"componentInputParameter":"pipelinechannel--loop-item-param-3","parameterExpressionSelector":"parseJson(string_value)[\"B_b\"]"}}},"taskInfo":{"name":"print-op-4"}}}},"inputDefinitions":{"parameters":{"pipelinechannel--loop-item-param-3":{"parameterType":"STRUCT"}}}}' + pipelines.kubeflow.org/components-comp-print-op: '{"executorLabel":"exec-print-op","inputDefinitions":{"parameters":{"s":{"parameterType":"STRING"}}}}' + pipelines.kubeflow.org/components-comp-print-op-2: '{"executorLabel":"exec-print-op-2","inputDefinitions":{"parameters":{"s":{"parameterType":"STRING"}}}}' + pipelines.kubeflow.org/components-comp-print-op-3: '{"executorLabel":"exec-print-op-3","inputDefinitions":{"parameters":{"s":{"parameterType":"STRING"}}}}' + pipelines.kubeflow.org/components-comp-print-op-4: '{"executorLabel":"exec-print-op-4","inputDefinitions":{"parameters":{"s":{"parameterType":"STRING"}}}}' + pipelines.kubeflow.org/components-root: '{"dag":{"tasks":{"for-loop-2":{"componentRef":{"name":"comp-for-loop-2"},"iteratorPolicy":{"parallelismLimit":2},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-1","items":{"raw":"[{\"A_a\": + \"1\", \"B_b\": \"10\"}, {\"A_a\": \"2\", \"B_b\": \"20\"}, {\"A_a\": \"3\", + \"B_b\": \"30\"}, {\"A_a\": \"4\", \"B_b\": \"40\"}, {\"A_a\": \"5\", \"B_b\": + \"50\"}, {\"A_a\": \"6\", \"B_b\": \"60\"}, {\"A_a\": \"7\", \"B_b\": \"70\"}, + {\"A_a\": \"8\", \"B_b\": \"80\"}, {\"A_a\": \"9\", \"B_b\": \"90\"}, {\"A_a\": + \"10\", \"B_b\": \"100\"}]"}},"taskInfo":{"name":"chicken"}},"for-loop-4":{"componentRef":{"name":"comp-for-loop-4"},"iteratorPolicy":{"parallelismLimit":4},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-3","items":{"raw":"[{\"A_a\": + \"1\", \"B_b\": \"10\"}, {\"A_a\": \"2\", \"B_b\": \"20\"}, {\"A_a\": \"3\", + \"B_b\": \"30\"}, {\"A_a\": \"4\", \"B_b\": \"40\"}, {\"A_a\": \"5\", \"B_b\": + \"50\"}, {\"A_a\": \"6\", \"B_b\": \"60\"}, {\"A_a\": \"7\", \"B_b\": \"70\"}, + {\"A_a\": \"8\", \"B_b\": \"80\"}, {\"A_a\": \"9\", \"B_b\": \"90\"}, {\"A_a\": + \"10\", \"B_b\": \"100\"}]"}},"taskInfo":{"name":"muffin"}}}}}' + pipelines.kubeflow.org/implementations-comp-print-op: '{"args":["--executor_input","{{$}}","--function_to_execute","print_op"],"command":["sh","-c","\nif + ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m + ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 + python3 -m pip install --quiet --no-warn-script-location ''kfp==2.7.0'' ''--no-deps'' + ''typing-extensions\u003e=3.7.4,\u003c5; python_version\u003c\"3.9\"'' \u0026\u0026 + \"$0\" \"$@\"\n","sh","-ec","program_path=$(mktemp -d)\n\nprintf \"%s\" \"$0\" + \u003e \"$program_path/ephemeral_component.py\"\n_KFP_RUNTIME=true python3 -m + kfp.dsl.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n","\nimport + kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef + print_op(s: str):\n print(s)\n\n"],"image":"python:3.7"}' + pipelines.kubeflow.org/implementations-comp-print-op-2: '{"args":["--executor_input","{{$}}","--function_to_execute","print_op"],"command":["sh","-c","\nif + ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m + ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 + python3 -m pip install --quiet --no-warn-script-location ''kfp==2.7.0'' ''--no-deps'' + ''typing-extensions\u003e=3.7.4,\u003c5; python_version\u003c\"3.9\"'' \u0026\u0026 + \"$0\" \"$@\"\n","sh","-ec","program_path=$(mktemp -d)\n\nprintf \"%s\" \"$0\" + \u003e \"$program_path/ephemeral_component.py\"\n_KFP_RUNTIME=true python3 -m + kfp.dsl.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n","\nimport + kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef + print_op(s: str):\n print(s)\n\n"],"image":"python:3.7"}' + pipelines.kubeflow.org/implementations-comp-print-op-3: '{"args":["--executor_input","{{$}}","--function_to_execute","print_op"],"command":["sh","-c","\nif + ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m + ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 + python3 -m pip install --quiet --no-warn-script-location ''kfp==2.7.0'' ''--no-deps'' + ''typing-extensions\u003e=3.7.4,\u003c5; python_version\u003c\"3.9\"'' \u0026\u0026 + \"$0\" \"$@\"\n","sh","-ec","program_path=$(mktemp -d)\n\nprintf \"%s\" \"$0\" + \u003e \"$program_path/ephemeral_component.py\"\n_KFP_RUNTIME=true python3 -m + kfp.dsl.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n","\nimport + kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef + print_op(s: str):\n print(s)\n\n"],"image":"python:3.7"}' + pipelines.kubeflow.org/implementations-comp-print-op-4: '{"args":["--executor_input","{{$}}","--function_to_execute","print_op"],"command":["sh","-c","\nif + ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m + ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 + python3 -m pip install --quiet --no-warn-script-location ''kfp==2.7.0'' ''--no-deps'' + ''typing-extensions\u003e=3.7.4,\u003c5; python_version\u003c\"3.9\"'' \u0026\u0026 + \"$0\" \"$@\"\n","sh","-ec","program_path=$(mktemp -d)\n\nprintf \"%s\" \"$0\" + \u003e \"$program_path/ephemeral_component.py\"\n_KFP_RUNTIME=true python3 -m + kfp.dsl.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n","\nimport + kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef + print_op(s: str):\n print(s)\n\n"],"image":"python:3.7"}' + creationTimestamp: null + generateName: my-pipeline- +spec: + arguments: {} + entrypoint: entrypoint + podMetadata: + annotations: + pipelines.kubeflow.org/v2_component: "true" + labels: + pipelines.kubeflow.org/v2_component: "true" + serviceAccountName: pipeline-runner + templates: + - container: + args: + - --type + - CONTAINER + - --pipeline_name + - my-pipeline + - --run_id + - '{{workflow.uid}}' + - --dag_execution_id + - '{{inputs.parameters.parent-dag-id}}' + - --component + - '{{inputs.parameters.component}}' + - --task + - '{{inputs.parameters.task}}' + - --container + - '{{inputs.parameters.container}}' + - --iteration_index + - '{{inputs.parameters.iteration-index}}' + - --cached_decision_path + - '{{outputs.parameters.cached-decision.path}}' + - --pod_spec_patch_path + - '{{outputs.parameters.pod-spec-patch.path}}' + - --condition_path + - '{{outputs.parameters.condition.path}}' + - --kubernetes_config + - '{{inputs.parameters.kubernetes-config}}' + command: + - driver + image: gcr.io/ml-pipeline/kfp-driver + name: "" + resources: + limits: + cpu: 500m + memory: 512Mi + requests: + cpu: 100m + memory: 64Mi + inputs: + parameters: + - name: component + - name: task + - name: container + - name: parent-dag-id + - default: "-1" + name: iteration-index + - default: "" + name: kubernetes-config + metadata: {} + name: system-container-driver + outputs: + parameters: + - name: pod-spec-patch + valueFrom: + default: "" + path: /tmp/outputs/pod-spec-patch + - default: "false" + name: cached-decision + valueFrom: + default: "false" + path: /tmp/outputs/cached-decision + - name: condition + valueFrom: + default: "true" + path: /tmp/outputs/condition + - dag: + tasks: + - arguments: + parameters: + - name: pod-spec-patch + value: '{{inputs.parameters.pod-spec-patch}}' + name: executor + template: system-container-impl + when: '{{inputs.parameters.cached-decision}} != true' + inputs: + parameters: + - name: pod-spec-patch + - default: "false" + name: cached-decision + metadata: {} + name: system-container-executor + outputs: {} + - container: + command: + - should-be-overridden-during-runtime + env: + - name: KFP_POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: KFP_POD_UID + valueFrom: + fieldRef: + fieldPath: metadata.uid + envFrom: + - configMapRef: + name: metadata-grpc-configmap + optional: true + image: gcr.io/ml-pipeline/should-be-overridden-during-runtime + name: "" + resources: {} + volumeMounts: + - mountPath: /kfp-launcher + name: kfp-launcher + initContainers: + - command: + - launcher-v2 + - --copy + - /kfp-launcher/launch + image: gcr.io/ml-pipeline/kfp-launcher + name: kfp-launcher + resources: + limits: + cpu: 500m + memory: 128Mi + requests: + cpu: 100m + volumeMounts: + - mountPath: /kfp-launcher + name: kfp-launcher + inputs: + parameters: + - name: pod-spec-patch + metadata: {} + name: system-container-impl + outputs: {} + podSpecPatch: '{{inputs.parameters.pod-spec-patch}}' + volumes: + - emptyDir: {} + name: kfp-launcher + - dag: + tasks: + - arguments: + parameters: + - name: component + value: '{{workflow.annotations.pipelines.kubeflow.org/components-comp-print-op}}' + - name: task + value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-print-op"},"inputs":{"parameters":{"s":{"componentInputParameter":"pipelinechannel--loop-item-param-1","parameterExpressionSelector":"parseJson(string_value)[\"A_a\"]"}}},"taskInfo":{"name":"print-op"}}' + - name: container + value: '{{workflow.annotations.pipelines.kubeflow.org/implementations-comp-print-op}}' + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + name: print-op-driver + template: system-container-driver + - arguments: + parameters: + - name: pod-spec-patch + value: '{{tasks.print-op-driver.outputs.parameters.pod-spec-patch}}' + - default: "false" + name: cached-decision + value: '{{tasks.print-op-driver.outputs.parameters.cached-decision}}' + depends: print-op-driver.Succeeded + name: print-op + template: system-container-executor + - arguments: + parameters: + - name: component + value: '{{workflow.annotations.pipelines.kubeflow.org/components-comp-print-op-2}}' + - name: task + value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-print-op-2"},"inputs":{"parameters":{"s":{"componentInputParameter":"pipelinechannel--loop-item-param-1","parameterExpressionSelector":"parseJson(string_value)[\"B_b\"]"}}},"taskInfo":{"name":"print-op-2"}}' + - name: container + value: '{{workflow.annotations.pipelines.kubeflow.org/implementations-comp-print-op-2}}' + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + name: print-op-2-driver + template: system-container-driver + - arguments: + parameters: + - name: pod-spec-patch + value: '{{tasks.print-op-2-driver.outputs.parameters.pod-spec-patch}}' + - default: "false" + name: cached-decision + value: '{{tasks.print-op-2-driver.outputs.parameters.cached-decision}}' + depends: print-op-2-driver.Succeeded + name: print-op-2 + template: system-container-executor + inputs: + parameters: + - name: parent-dag-id + metadata: {} + name: comp-for-loop-2 + outputs: {} + - dag: + tasks: + - arguments: + parameters: + - name: component + value: '{{workflow.annotations.pipelines.kubeflow.org/components-comp-print-op-3}}' + - name: task + value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-print-op-3"},"inputs":{"parameters":{"s":{"componentInputParameter":"pipelinechannel--loop-item-param-3","parameterExpressionSelector":"parseJson(string_value)[\"A_a\"]"}}},"taskInfo":{"name":"print-op-3"}}' + - name: container + value: '{{workflow.annotations.pipelines.kubeflow.org/implementations-comp-print-op-3}}' + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + name: print-op-3-driver + template: system-container-driver + - arguments: + parameters: + - name: pod-spec-patch + value: '{{tasks.print-op-3-driver.outputs.parameters.pod-spec-patch}}' + - default: "false" + name: cached-decision + value: '{{tasks.print-op-3-driver.outputs.parameters.cached-decision}}' + depends: print-op-3-driver.Succeeded + name: print-op-3 + template: system-container-executor + - arguments: + parameters: + - name: component + value: '{{workflow.annotations.pipelines.kubeflow.org/components-comp-print-op-4}}' + - name: task + value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-print-op-4"},"inputs":{"parameters":{"s":{"componentInputParameter":"pipelinechannel--loop-item-param-3","parameterExpressionSelector":"parseJson(string_value)[\"B_b\"]"}}},"taskInfo":{"name":"print-op-4"}}' + - name: container + value: '{{workflow.annotations.pipelines.kubeflow.org/implementations-comp-print-op-4}}' + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + name: print-op-4-driver + template: system-container-driver + - arguments: + parameters: + - name: pod-spec-patch + value: '{{tasks.print-op-4-driver.outputs.parameters.pod-spec-patch}}' + - default: "false" + name: cached-decision + value: '{{tasks.print-op-4-driver.outputs.parameters.cached-decision}}' + depends: print-op-4-driver.Succeeded + name: print-op-4 + template: system-container-executor + inputs: + parameters: + - name: parent-dag-id + metadata: {} + name: comp-for-loop-4 + outputs: {} + - container: + args: + - --type + - '{{inputs.parameters.driver-type}}' + - --pipeline_name + - my-pipeline + - --run_id + - '{{workflow.uid}}' + - --dag_execution_id + - '{{inputs.parameters.parent-dag-id}}' + - --component + - '{{inputs.parameters.component}}' + - --task + - '{{inputs.parameters.task}}' + - --runtime_config + - '{{inputs.parameters.runtime-config}}' + - --iteration_index + - '{{inputs.parameters.iteration-index}}' + - --execution_id_path + - '{{outputs.parameters.execution-id.path}}' + - --iteration_count_path + - '{{outputs.parameters.iteration-count.path}}' + - --condition_path + - '{{outputs.parameters.condition.path}}' + command: + - driver + image: gcr.io/ml-pipeline/kfp-driver + name: "" + resources: + limits: + cpu: 500m + memory: 512Mi + requests: + cpu: 100m + memory: 64Mi + inputs: + parameters: + - name: component + - default: "" + name: runtime-config + - default: "" + name: task + - default: "0" + name: parent-dag-id + - default: "-1" + name: iteration-index + - default: DAG + name: driver-type + metadata: {} + name: system-dag-driver + outputs: + parameters: + - name: execution-id + valueFrom: + path: /tmp/outputs/execution-id + - name: iteration-count + valueFrom: + default: "0" + path: /tmp/outputs/iteration-count + - name: condition + valueFrom: + default: "true" + path: /tmp/outputs/condition + - dag: + tasks: + - arguments: + parameters: + - name: component + value: '{{workflow.annotations.pipelines.kubeflow.org/components-comp-for-loop-2}}' + - name: iteration-index + value: '{{inputs.parameters.iteration-index}}' + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + - name: task + value: '{"componentRef":{"name":"comp-for-loop-2"},"iteratorPolicy":{"parallelismLimit":2},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-1","items":{"raw":"[{\"A_a\": + \"1\", \"B_b\": \"10\"}, {\"A_a\": \"2\", \"B_b\": \"20\"}, {\"A_a\": + \"3\", \"B_b\": \"30\"}, {\"A_a\": \"4\", \"B_b\": \"40\"}, {\"A_a\": + \"5\", \"B_b\": \"50\"}, {\"A_a\": \"6\", \"B_b\": \"60\"}, {\"A_a\": + \"7\", \"B_b\": \"70\"}, {\"A_a\": \"8\", \"B_b\": \"80\"}, {\"A_a\": + \"9\", \"B_b\": \"90\"}, {\"A_a\": \"10\", \"B_b\": \"100\"}]"}},"taskInfo":{"name":"chicken"}}' + name: iteration-item-driver + template: system-dag-driver + - arguments: + parameters: + - name: parent-dag-id + value: '{{tasks.iteration-item-driver.outputs.parameters.execution-id}}' + - name: condition + value: '{{tasks.iteration-item-driver.outputs.parameters.condition}}' + depends: iteration-item-driver.Succeeded + name: iteration-item + template: comp-for-loop-2 + inputs: + parameters: + - name: parent-dag-id + - name: iteration-index + metadata: {} + name: comp-for-loop-2-iteration + outputs: {} + - dag: + tasks: + - arguments: + parameters: + - name: component + value: '{{workflow.annotations.pipelines.kubeflow.org/components-comp-for-loop-2}}' + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + - name: task + value: '{"componentRef":{"name":"comp-for-loop-2"},"iteratorPolicy":{"parallelismLimit":2},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-1","items":{"raw":"[{\"A_a\": + \"1\", \"B_b\": \"10\"}, {\"A_a\": \"2\", \"B_b\": \"20\"}, {\"A_a\": + \"3\", \"B_b\": \"30\"}, {\"A_a\": \"4\", \"B_b\": \"40\"}, {\"A_a\": + \"5\", \"B_b\": \"50\"}, {\"A_a\": \"6\", \"B_b\": \"60\"}, {\"A_a\": + \"7\", \"B_b\": \"70\"}, {\"A_a\": \"8\", \"B_b\": \"80\"}, {\"A_a\": + \"9\", \"B_b\": \"90\"}, {\"A_a\": \"10\", \"B_b\": \"100\"}]"}},"taskInfo":{"name":"chicken"}}' + name: iteration-driver + template: system-dag-driver + - arguments: + parameters: + - name: parent-dag-id + value: '{{tasks.iteration-driver.outputs.parameters.execution-id}}' + - name: iteration-index + value: '{{item}}' + depends: iteration-driver.Succeeded + name: iteration-iterations + template: comp-for-loop-2-iteration + withSequence: + count: '{{tasks.iteration-driver.outputs.parameters.iteration-count}}' + inputs: + parameters: + - name: parent-dag-id + metadata: {} + name: comp-for-loop-2-loop-for-loop-2 + outputs: {} + parallelism: 2 + - dag: + tasks: + - arguments: + parameters: + - name: component + value: '{{workflow.annotations.pipelines.kubeflow.org/components-comp-for-loop-4}}' + - name: iteration-index + value: '{{inputs.parameters.iteration-index}}' + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + - name: task + value: '{"componentRef":{"name":"comp-for-loop-4"},"iteratorPolicy":{"parallelismLimit":4},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-3","items":{"raw":"[{\"A_a\": + \"1\", \"B_b\": \"10\"}, {\"A_a\": \"2\", \"B_b\": \"20\"}, {\"A_a\": + \"3\", \"B_b\": \"30\"}, {\"A_a\": \"4\", \"B_b\": \"40\"}, {\"A_a\": + \"5\", \"B_b\": \"50\"}, {\"A_a\": \"6\", \"B_b\": \"60\"}, {\"A_a\": + \"7\", \"B_b\": \"70\"}, {\"A_a\": \"8\", \"B_b\": \"80\"}, {\"A_a\": + \"9\", \"B_b\": \"90\"}, {\"A_a\": \"10\", \"B_b\": \"100\"}]"}},"taskInfo":{"name":"muffin"}}' + name: iteration-item-driver + template: system-dag-driver + - arguments: + parameters: + - name: parent-dag-id + value: '{{tasks.iteration-item-driver.outputs.parameters.execution-id}}' + - name: condition + value: '{{tasks.iteration-item-driver.outputs.parameters.condition}}' + depends: iteration-item-driver.Succeeded + name: iteration-item + template: comp-for-loop-4 + inputs: + parameters: + - name: parent-dag-id + - name: iteration-index + metadata: {} + name: comp-for-loop-4-iteration + outputs: {} + - dag: + tasks: + - arguments: + parameters: + - name: component + value: '{{workflow.annotations.pipelines.kubeflow.org/components-comp-for-loop-4}}' + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + - name: task + value: '{"componentRef":{"name":"comp-for-loop-4"},"iteratorPolicy":{"parallelismLimit":4},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-3","items":{"raw":"[{\"A_a\": + \"1\", \"B_b\": \"10\"}, {\"A_a\": \"2\", \"B_b\": \"20\"}, {\"A_a\": + \"3\", \"B_b\": \"30\"}, {\"A_a\": \"4\", \"B_b\": \"40\"}, {\"A_a\": + \"5\", \"B_b\": \"50\"}, {\"A_a\": \"6\", \"B_b\": \"60\"}, {\"A_a\": + \"7\", \"B_b\": \"70\"}, {\"A_a\": \"8\", \"B_b\": \"80\"}, {\"A_a\": + \"9\", \"B_b\": \"90\"}, {\"A_a\": \"10\", \"B_b\": \"100\"}]"}},"taskInfo":{"name":"muffin"}}' + name: iteration-driver + template: system-dag-driver + - arguments: + parameters: + - name: parent-dag-id + value: '{{tasks.iteration-driver.outputs.parameters.execution-id}}' + - name: iteration-index + value: '{{item}}' + depends: iteration-driver.Succeeded + name: iteration-iterations + template: comp-for-loop-4-iteration + withSequence: + count: '{{tasks.iteration-driver.outputs.parameters.iteration-count}}' + inputs: + parameters: + - name: parent-dag-id + metadata: {} + name: comp-for-loop-4-loop-for-loop-4 + outputs: {} + parallelism: 4 + - dag: + tasks: + - arguments: + parameters: + - name: component + value: '{{workflow.annotations.pipelines.kubeflow.org/components-comp-for-loop-2}}' + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + - name: task + value: '{"componentRef":{"name":"comp-for-loop-2"},"iteratorPolicy":{"parallelismLimit":2},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-1","items":{"raw":"[{\"A_a\": + \"1\", \"B_b\": \"10\"}, {\"A_a\": \"2\", \"B_b\": \"20\"}, {\"A_a\": + \"3\", \"B_b\": \"30\"}, {\"A_a\": \"4\", \"B_b\": \"40\"}, {\"A_a\": + \"5\", \"B_b\": \"50\"}, {\"A_a\": \"6\", \"B_b\": \"60\"}, {\"A_a\": + \"7\", \"B_b\": \"70\"}, {\"A_a\": \"8\", \"B_b\": \"80\"}, {\"A_a\": + \"9\", \"B_b\": \"90\"}, {\"A_a\": \"10\", \"B_b\": \"100\"}]"}},"taskInfo":{"name":"chicken"}}' + name: for-loop-2-loop-driver + template: system-dag-driver + - arguments: + parameters: + - name: parent-dag-id + value: '{{tasks.for-loop-2-loop-driver.outputs.parameters.execution-id}}' + depends: for-loop-2-loop-driver.Succeeded + name: for-loop-2-loop + template: comp-for-loop-2-loop-for-loop-2 + - arguments: + parameters: + - name: component + value: '{{workflow.annotations.pipelines.kubeflow.org/components-comp-for-loop-4}}' + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + - name: task + value: '{"componentRef":{"name":"comp-for-loop-4"},"iteratorPolicy":{"parallelismLimit":4},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-3","items":{"raw":"[{\"A_a\": + \"1\", \"B_b\": \"10\"}, {\"A_a\": \"2\", \"B_b\": \"20\"}, {\"A_a\": + \"3\", \"B_b\": \"30\"}, {\"A_a\": \"4\", \"B_b\": \"40\"}, {\"A_a\": + \"5\", \"B_b\": \"50\"}, {\"A_a\": \"6\", \"B_b\": \"60\"}, {\"A_a\": + \"7\", \"B_b\": \"70\"}, {\"A_a\": \"8\", \"B_b\": \"80\"}, {\"A_a\": + \"9\", \"B_b\": \"90\"}, {\"A_a\": \"10\", \"B_b\": \"100\"}]"}},"taskInfo":{"name":"muffin"}}' + name: for-loop-4-loop-driver + template: system-dag-driver + - arguments: + parameters: + - name: parent-dag-id + value: '{{tasks.for-loop-4-loop-driver.outputs.parameters.execution-id}}' + depends: for-loop-4-loop-driver.Succeeded + name: for-loop-4-loop + template: comp-for-loop-4-loop-for-loop-4 + inputs: + parameters: + - name: parent-dag-id + metadata: {} + name: root + outputs: {} + - dag: + tasks: + - arguments: + parameters: + - name: component + value: '{{workflow.annotations.pipelines.kubeflow.org/components-root}}' + - name: runtime-config + value: '{"parameters":{"text":{"stringValue":"hi there"}}}' + - name: driver-type + value: ROOT_DAG + name: root-driver + template: system-dag-driver + - arguments: + parameters: + - name: parent-dag-id + value: '{{tasks.root-driver.outputs.parameters.execution-id}}' + - name: condition + value: "" + depends: root-driver.Succeeded + name: root + template: root + inputs: {} + metadata: {} + name: entrypoint + outputs: {} +status: + finishedAt: null + startedAt: null diff --git a/backend/src/v2/compiler/testdata/multiple_parallel_loops.json b/backend/src/v2/compiler/testdata/multiple_parallel_loops.json new file mode 100644 index 000000000000..0e59a0af2d39 --- /dev/null +++ b/backend/src/v2/compiler/testdata/multiple_parallel_loops.json @@ -0,0 +1,284 @@ +{ + "pipelineSpec" :{ + "components": { + "comp-for-loop-2": { + "dag": { + "tasks": { + "print-op": { + "cachingOptions": { + "enableCache": true + }, + "componentRef": { + "name": "comp-print-op" + }, + "inputs": { + "parameters": { + "s": { + "componentInputParameter": "pipelinechannel--loop-item-param-1", + "parameterExpressionSelector": "parseJson(string_value)[\"A_a\"]" + } + } + }, + "taskInfo": { + "name": "print-op" + } + }, + "print-op-2": { + "cachingOptions": { + "enableCache": true + }, + "componentRef": { + "name": "comp-print-op-2" + }, + "inputs": { + "parameters": { + "s": { + "componentInputParameter": "pipelinechannel--loop-item-param-1", + "parameterExpressionSelector": "parseJson(string_value)[\"B_b\"]" + } + } + }, + "taskInfo": { + "name": "print-op-2" + } + } + } + }, + "inputDefinitions": { + "parameters": { + "pipelinechannel--loop-item-param-1": { + "parameterType": "STRUCT" + } + } + } + }, + "comp-for-loop-4": { + "dag": { + "tasks": { + "print-op-3": { + "cachingOptions": { + "enableCache": true + }, + "componentRef": { + "name": "comp-print-op-3" + }, + "inputs": { + "parameters": { + "s": { + "componentInputParameter": "pipelinechannel--loop-item-param-3", + "parameterExpressionSelector": "parseJson(string_value)[\"A_a\"]" + } + } + }, + "taskInfo": { + "name": "print-op-3" + } + }, + "print-op-4": { + "cachingOptions": { + "enableCache": true + }, + "componentRef": { + "name": "comp-print-op-4" + }, + "inputs": { + "parameters": { + "s": { + "componentInputParameter": "pipelinechannel--loop-item-param-3", + "parameterExpressionSelector": "parseJson(string_value)[\"B_b\"]" + } + } + }, + "taskInfo": { + "name": "print-op-4" + } + } + } + }, + "inputDefinitions": { + "parameters": { + "pipelinechannel--loop-item-param-3": { + "parameterType": "STRUCT" + } + } + } + }, + "comp-print-op": { + "executorLabel": "exec-print-op", + "inputDefinitions": { + "parameters": { + "s": { + "parameterType": "STRING" + } + } + } + }, + "comp-print-op-2": { + "executorLabel": "exec-print-op-2", + "inputDefinitions": { + "parameters": { + "s": { + "parameterType": "STRING" + } + } + } + }, + "comp-print-op-3": { + "executorLabel": "exec-print-op-3", + "inputDefinitions": { + "parameters": { + "s": { + "parameterType": "STRING" + } + } + } + }, + "comp-print-op-4": { + "executorLabel": "exec-print-op-4", + "inputDefinitions": { + "parameters": { + "s": { + "parameterType": "STRING" + } + } + } + } + }, + "deploymentSpec": { + "executors": { + "exec-print-op": { + "container": { + "args": [ + "--executor_input", + "{{$}}", + "--function_to_execute", + "print_op" + ], + "command": [ + "sh", + "-c", + "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==2.7.0' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"$0\" \"$@\"\n", + "sh", + "-ec", + "program_path=$(mktemp -d)\n\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\n_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n", + "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef print_op(s: str):\n print(s)\n\n" + ], + "image": "python:3.7" + } + }, + "exec-print-op-2": { + "container": { + "args": [ + "--executor_input", + "{{$}}", + "--function_to_execute", + "print_op" + ], + "command": [ + "sh", + "-c", + "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==2.7.0' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"$0\" \"$@\"\n", + "sh", + "-ec", + "program_path=$(mktemp -d)\n\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\n_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n", + "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef print_op(s: str):\n print(s)\n\n" + ], + "image": "python:3.7" + } + }, + "exec-print-op-3": { + "container": { + "args": [ + "--executor_input", + "{{$}}", + "--function_to_execute", + "print_op" + ], + "command": [ + "sh", + "-c", + "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==2.7.0' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"$0\" \"$@\"\n", + "sh", + "-ec", + "program_path=$(mktemp -d)\n\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\n_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n", + "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef print_op(s: str):\n print(s)\n\n" + ], + "image": "python:3.7" + } + }, + "exec-print-op-4": { + "container": { + "args": [ + "--executor_input", + "{{$}}", + "--function_to_execute", + "print_op" + ], + "command": [ + "sh", + "-c", + "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==2.7.0' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"$0\" \"$@\"\n", + "sh", + "-ec", + "program_path=$(mktemp -d)\n\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\n_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n", + "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef print_op(s: str):\n print(s)\n\n" + ], + "image": "python:3.7" + } + } + } + }, + "pipelineInfo": { + "name": "my-pipeline" + }, + "root": { + "dag": { + "tasks": { + "for-loop-2": { + "componentRef": { + "name": "comp-for-loop-2" + }, + "iteratorPolicy": { + "parallelismLimit": 2 + }, + "parameterIterator": { + "itemInput": "pipelinechannel--loop-item-param-1", + "items": { + "raw": "[{\"A_a\": \"1\", \"B_b\": \"10\"}, {\"A_a\": \"2\", \"B_b\": \"20\"}, {\"A_a\": \"3\", \"B_b\": \"30\"}, {\"A_a\": \"4\", \"B_b\": \"40\"}, {\"A_a\": \"5\", \"B_b\": \"50\"}, {\"A_a\": \"6\", \"B_b\": \"60\"}, {\"A_a\": \"7\", \"B_b\": \"70\"}, {\"A_a\": \"8\", \"B_b\": \"80\"}, {\"A_a\": \"9\", \"B_b\": \"90\"}, {\"A_a\": \"10\", \"B_b\": \"100\"}]" + } + }, + "taskInfo": { + "name": "chicken" + } + }, + "for-loop-4": { + "componentRef": { + "name": "comp-for-loop-4" + }, + "iteratorPolicy": { + "parallelismLimit": 4 + }, + "parameterIterator": { + "itemInput": "pipelinechannel--loop-item-param-3", + "items": { + "raw": "[{\"A_a\": \"1\", \"B_b\": \"10\"}, {\"A_a\": \"2\", \"B_b\": \"20\"}, {\"A_a\": \"3\", \"B_b\": \"30\"}, {\"A_a\": \"4\", \"B_b\": \"40\"}, {\"A_a\": \"5\", \"B_b\": \"50\"}, {\"A_a\": \"6\", \"B_b\": \"60\"}, {\"A_a\": \"7\", \"B_b\": \"70\"}, {\"A_a\": \"8\", \"B_b\": \"80\"}, {\"A_a\": \"9\", \"B_b\": \"90\"}, {\"A_a\": \"10\", \"B_b\": \"100\"}]" + } + }, + "taskInfo": { + "name": "muffin" + } + } + } + } + }, + "schemaVersion": "2.1.0", + "sdkVersion": "kfp-2.7.0" + }, + "runtimeConfig": { + "parameters": { + "text": { + "stringValue": "hi there" + } + } + } +} \ No newline at end of file