Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(backend): Add Parallelism Limit to ParallelFor tasks. Fixes #8718 #10798

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

gmfrasca
Copy link
Member

@gmfrasca gmfrasca commented May 7, 2024

Description of your changes:
Fixes #8718

Adds the Parallelism item to a DAG template if specified by a task's IteratorPolicy (ie ParallelFor w/ a parallelism limit).

Checklist:

@gmfrasca
Copy link
Member Author

gmfrasca commented May 7, 2024

looks like a CI infra issue?

/retest just to check

Copy link

@gmfrasca: The /retest command does not accept any targets.
The following commands are available to trigger required jobs:

  • /test kfp-kubernetes-test-python310
  • /test kfp-kubernetes-test-python311
  • /test kfp-kubernetes-test-python312
  • /test kfp-kubernetes-test-python38
  • /test kfp-kubernetes-test-python39
  • /test kubeflow-pipeline-backend-test
  • /test kubeflow-pipeline-frontend-test
  • /test kubeflow-pipeline-mkp-snapshot-test
  • /test kubeflow-pipeline-mkp-test
  • /test kubeflow-pipelines-backend-visualization
  • /test kubeflow-pipelines-component-yaml
  • /test kubeflow-pipelines-components-google-cloud-python38
  • /test kubeflow-pipelines-integration-v2
  • /test kubeflow-pipelines-manifests
  • /test kubeflow-pipelines-sdk-docformatter
  • /test kubeflow-pipelines-sdk-execution-tests
  • /test kubeflow-pipelines-sdk-isort
  • /test kubeflow-pipelines-sdk-python310
  • /test kubeflow-pipelines-sdk-python311
  • /test kubeflow-pipelines-sdk-python312
  • /test kubeflow-pipelines-sdk-python38
  • /test kubeflow-pipelines-sdk-python39
  • /test kubeflow-pipelines-sdk-yapf
  • /test test-kfp-runtime-code-python310
  • /test test-kfp-runtime-code-python311
  • /test test-kfp-runtime-code-python312
  • /test test-kfp-runtime-code-python38
  • /test test-kfp-runtime-code-python39
  • /test test-run-all-gcpc-modules
  • /test test-upgrade-kfp-sdk

The following commands are available to trigger optional jobs:

  • /test kfp-kubernetes-execution-tests
  • /test kubeflow-pipeline-e2e-test
  • /test kubeflow-pipeline-upgrade-test
  • /test kubeflow-pipeline-upgrade-test-v2
  • /test kubeflow-pipelines-samples-v2

Use /test all to run the following jobs that were automatically triggered:

  • kubeflow-pipeline-backend-test
  • kubeflow-pipeline-e2e-test
  • kubeflow-pipeline-upgrade-test
  • kubeflow-pipeline-upgrade-test-v2
  • kubeflow-pipelines-samples-v2

In response to this:

looks like a CI infra issue?

/retest just to check

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

@gmfrasca
Copy link
Member Author

gmfrasca commented May 7, 2024

/retest

@HumairAK
Copy link
Contributor

HumairAK commented May 9, 2024

tested with this pipeline:

parallelfor.py
from kfp import compiler
from kfp import dsl
from kfp.dsl import Input, InputPath, Output, OutputPath, Dataset, Model, component

@dsl.component(base_image="quay.io/opendatahub/ds-pipelines-ci-executor-image:v1.0")
def preprocess(
        message: Input[str],
        output_model: Output[Model]
):
    import random
    line = "some_model"
    print(f"Message: {message}")
    with open(output_model.path, 'w') as output_file:
        output_file.write('line: {}'.format(line))
    output_model.metadata['accuracy'] = random.uniform(0, 1)


@dsl.component(base_image="quay.io/opendatahub/ds-pipelines-ci-executor-image:v1.0")
def train(
        model: Input[Model],
        epoch: Input[int],
        trained_model: Output[Model],
):
    import random
    line = "some_model"
    print(f"Train for epoch: {epoch}")
    with open(trained_model.path, 'w') as output_file:
        output_file.write('line: {}'.format(line))
    trained_model.metadata['accuracy'] = random.uniform(0, 1)


@dsl.pipeline(pipeline_root='', name='tutorial-data-passing')
def data_passing_pipeline():
    preprocess_task = preprocess(message="dataset").set_caching_options(enable_caching=False)
    with dsl.ParallelFor(items=[1, 5, 10, 25], parallelism=2) as epochs:
        train(model=preprocess_task.outputs['output_model'], epoch=epochs).set_caching_options(enable_caching=False)


if __name__ == '__main__':
    compiler.Compiler().compile(data_passing_pipeline, __file__ + '.yaml')

Worked successfully:

~ $ kubectl -n ${kfp_ns} get workflow tutorial-data-passing-drsfz -o yaml | yq '.spec.templates[-2]'
dag:
  tasks:
    - arguments:
        parameters:
          - name: component
            value: '{{workflow.annotations.pipelines.kubeflow.org/components-comp-for-loop-2}}'
          - name: parent-dag-id
            value: '{{inputs.parameters.parent-dag-id}}'
          - name: task
            value: '{"componentRef":{"name":"comp-for-loop-2"},"dependentTasks":["preprocess"],"inputs":{"artifacts":{"pipelinechannel--preprocess-output_model":{"taskOutputArtifact":{"outputArtifactKey":"output_model","producerTask":"preprocess"}}}},"iteratorPolicy":{"parallelismLimit":2},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-1","items":{"raw":"[1, 5, 10, 25]"}},"taskInfo":{"name":"for-loop-2"}}'
      depends: preprocess.Succeeded
      name: for-loop-2-driver
      template: system-dag-driver
    - arguments:
        parameters:
          - name: parent-dag-id
            value: '{{tasks.for-loop-2-driver.outputs.parameters.execution-id}}'
          - name: iteration-index
            value: '{{item}}'
      depends: for-loop-2-driver.Succeeded
      name: for-loop-2-iterations
      template: comp-for-loop-2-for-loop-2
      withSequence:
        count: '{{tasks.for-loop-2-driver.outputs.parameters.iteration-count}}'
    - arguments:
        parameters:
          - name: component
            value: '{{workflow.annotations.pipelines.kubeflow.org/components-comp-preprocess}}'
          - name: task
            value: '{"cachingOptions":{},"componentRef":{"name":"comp-preprocess"},"inputs":{"parameters":{"message":{"runtimeValue":{"constant":"dataset"}}}},"taskInfo":{"name":"preprocess"}}'
          - name: container
            value: '{{workflow.annotations.pipelines.kubeflow.org/implementations-comp-preprocess}}'
          - name: parent-dag-id
            value: '{{inputs.parameters.parent-dag-id}}'
      name: preprocess-driver
      template: system-container-driver
    - arguments:
        parameters:
          - name: pod-spec-patch
            value: '{{tasks.preprocess-driver.outputs.parameters.pod-spec-patch}}'
          - default: "false"
            name: cached-decision
            value: '{{tasks.preprocess-driver.outputs.parameters.cached-decision}}'
      depends: preprocess-driver.Succeeded
      name: preprocess
      template: system-container-executor
inputs:
  parameters:
    - name: parent-dag-id
metadata:
  annotations:
    sidecar.istio.io/inject: "false"
name: root
outputs: {}
parallelism: 2

Note the:

parallelism: 2

The UI feedback on this could be better:

image

Currently all 4 iterations show executing at the same time (they also never really show the done checkmark even once they finish). But this problem existed prior to this, and seems out of scope for this task, and should be in a follow up issue.

However I confirmed the pods are scheduled 2 at a time in this example (since parallelism = 2 in this example).

@HumairAK
Copy link
Contributor

HumairAK commented May 9, 2024

Hrmm testing it with the above pipeline amended to:

@dsl.pipeline(pipeline_root='', name='tutorial-data-passing')
def data_passing_pipeline():
    preprocess_task = preprocess(message="dataset").set_caching_options(enable_caching=False)
    with dsl.ParallelFor(items=[1, 5, 10, 25], parallelism=2) as epochs:
        train(model=preprocess_task.outputs['output_model'], epoch=epochs).set_caching_options(enable_caching=False)

    with dsl.ParallelFor(items=[6, 12, 24, 48], parallelism=4) as epochs:
        train(model=preprocess_task.outputs['output_model'], epoch=epochs).set_caching_options(enable_caching=False)

It looks like the workflow will use parallelism = 4 for both

@gmfrasca
Copy link
Member Author

gmfrasca commented May 9, 2024

as @HumairAK noted it looks like there's a problem when there are multiple ParallelFor components in a single DAG, as the parallelism mechanism applies to the entire DAG, and as it stands there's no way to assign various parallelismLimits to individual tasks within a DAG template.

The obvious solution/workaround for this is to add another layer of abstraction on iterator tasks, (ie, root DAG calls a new "XYZ-iterator" DAG, which contains the withSequence iterator and parallelism limit value), but this will make the DAG a bit more complex than it already is, and I'm not sure at this moment what the consequences of updating the workflow path to this may be. will need a bit to investigate.

@hsteude
Copy link
Contributor

hsteude commented May 10, 2024

as @HumairAK noted it looks like there's a problem when there are multiple ParallelFor components in a single DAG, as the parallelism mechanism applies to the entire DAG, and as it stands there's no way to assign various parallelismLimits to individual tasks within a DAG template.

The obvious solution/workaround for this is to add another layer of abstraction on iterator tasks, (ie, root DAG calls a new "XYZ-iterator" DAG, which contains the withSequence iterator and parallelism limit value), but this will make the DAG a bit more complex than it already is, and I'm not sure at this moment what the consequences of updating the workflow path to this may be. will need a bit to investigate.

Hi @gmfrasca, I think it's really important to have a limit that applies to the individual for loop, not the entire DAG. Have you considered using Argo's implementation of loop parallelism? Using that might even simplify the DAG. However, implementing this could lead to issues with other backends.

@gmfrasca
Copy link
Member Author

Hey @hsteude - so this implementation actually already leverages the Argo loop parallelism mechanism. The issue here is that the current compiled architecture of a pipeline aggregates all KFP pipeline steps into sequential tasks of a top-level root DAG Template, but the finest granularity you can specify that limit is at the Template level, not an individual DAGTask. Essentially, we do not have the concept of parallelism on a per-step basis to use in this current state.

The workaround/DAG re-architecture I mentioned above would bump out each of these steps to call their own intermediate Template, each time with its own DAG and iterator, and this template would simply call the component Template itself. With that, we could then specify individual parallelism limits for individual steps, since they are now encapsulated in a Template, at the cost of introducing another layer of abstraction/templating

@google-oss-prow google-oss-prow bot added size/XL and removed size/XS labels May 13, 2024
@gmfrasca
Copy link
Member Author

/test kubeflow-pipelines-samples-v2

Copy link

@gmfrasca: The following tests failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
kubeflow-pipeline-e2e-test 9780650 link false /test kubeflow-pipeline-e2e-test
kubeflow-pipelines-samples-v2 fb8a3ff link false /test kubeflow-pipelines-samples-v2
kubeflow-pipeline-upgrade-test fb8a3ff link false /test kubeflow-pipeline-upgrade-test

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

@rimolive
Copy link
Member

/lgtm

@rimolive
Copy link
Member

I reviewed this PR a month ago and this PR is still opened. I think if would be good having one more lgtm to ensure this PR is still good to merge.

@gregsheremeta
Copy link
Contributor

/hold

does hold work here? I'd like to review this before it's merged 😄

@gregsheremeta
Copy link
Contributor

they also never really show the done checkmark even once they finish

I've seen this in other pipelines too -- ones without loops. Might be anything with a sub-DAG.

@gregsheremeta
Copy link
Contributor

I'm caught off guard a bit because it looks like ParallelFor was mostly already working, except for the limit? And this PR fixes the limit and that's about it, right?

Is #8718 the correct Issue to fix, then? Should it be tweaked to call out that only parallelization limit was left? Or should 8718 be closed and a new Issue opened?

@gregsheremeta
Copy link
Contributor

While running this PR in an attempt to validate it and understand it better, I found that I can reliably cause an apiserver panic. The same pipeline works fine on both upstream master of https://github.com/kubeflow/pipelines and https://github.com/opendatahub-io/data-science-pipelines.

test pipeline (adapted from what Humair posted above):

from kfp import compiler
from kfp import dsl
from kfp.dsl import Input, Output, Model

@dsl.component(base_image="python:3.12")
def preprocess(
        message: Input[str],
        output_model: Output[Model]
):
    line = "useful_data"
    print(f"Message: {message}")
    with open(output_model.path, 'w') as output_file:
        output_file.write('line: {}'.format(line))

@dsl.component(base_image="python:3.12")
def train(
        model: Input[Model],
        counter: Input[int],
        trained_model: Output[Model],
):
    import random
    line = "some_model"
    print(f"train loop counter: {counter}")
    with open(trained_model.path, 'w') as output_file:
        output_file.write('line: {}'.format(line))
    trained_model.metadata['accuracy'] = random.uniform(0, 1)

@dsl.pipeline(pipeline_root='', name='pl-1')
def parallel_limit_test_pipeline():
    preprocess_task = preprocess(message="hello-world").set_caching_options(enable_caching=False)
    
    with dsl.ParallelFor(items=[1, 2, 3, 4], parallelism=2) as counter:
        train(model=preprocess_task.outputs['output_model'], counter=counter).set_caching_options(enable_caching=False)

if __name__ == '__main__':
    compiler.Compiler().compile(parallel_limit_test_pipeline, __file__ + '.yaml')

compiled version of that pipeline:

# PIPELINE DEFINITION
# Name: pl-1
components:
  comp-for-loop-2:
    dag:
      tasks:
        train:
          cachingOptions: {}
          componentRef:
            name: comp-train
          inputs:
            artifacts:
              model:
                componentInputArtifact: pipelinechannel--preprocess-output_model
            parameters:
              counter:
                componentInputParameter: pipelinechannel--loop-item-param-1
          taskInfo:
            name: train
    inputDefinitions:
      artifacts:
        pipelinechannel--preprocess-output_model:
          artifactType:
            schemaTitle: system.Model
            schemaVersion: 0.0.1
      parameters:
        pipelinechannel--loop-item-param-1:
          parameterType: NUMBER_INTEGER
  comp-preprocess:
    executorLabel: exec-preprocess
    inputDefinitions:
      parameters:
        message:
          parameterType: STRING
    outputDefinitions:
      artifacts:
        output_model:
          artifactType:
            schemaTitle: system.Model
            schemaVersion: 0.0.1
  comp-train:
    executorLabel: exec-train
    inputDefinitions:
      artifacts:
        model:
          artifactType:
            schemaTitle: system.Model
            schemaVersion: 0.0.1
      parameters:
        counter:
          parameterType: NUMBER_INTEGER
    outputDefinitions:
      artifacts:
        trained_model:
          artifactType:
            schemaTitle: system.Model
            schemaVersion: 0.0.1
deploymentSpec:
  executors:
    exec-preprocess:
      container:
        args:
        - --executor_input
        - '{{$}}'
        - --function_to_execute
        - preprocess
        command:
        - sh
        - -c
        - "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip ||\
          \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
          \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.7.0'\
          \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
          $0\" \"$@\"\n"
        - sh
        - -ec
        - 'program_path=$(mktemp -d)


          printf "%s" "$0" > "$program_path/ephemeral_component.py"

          _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"

          '
        - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
          \ *\nfrom builtins import str\n\ndef preprocess(\n        message: Input[str],\n\
          \        output_model: Output[Model]\n):\n    line = \"useful_data\"\n \
          \   print(f\"Message: {message}\")\n    with open(output_model.path, 'w')\
          \ as output_file:\n        output_file.write('line: {}'.format(line))\n\n"
        image: python:3.12
    exec-train:
      container:
        args:
        - --executor_input
        - '{{$}}'
        - --function_to_execute
        - train
        command:
        - sh
        - -c
        - "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip ||\
          \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
          \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.7.0'\
          \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
          $0\" \"$@\"\n"
        - sh
        - -ec
        - 'program_path=$(mktemp -d)


          printf "%s" "$0" > "$program_path/ephemeral_component.py"

          _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"

          '
        - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
          \ *\nfrom builtins import int\n\ndef train(\n        model: Input[Model],\n\
          \        counter: Input[int],\n        trained_model: Output[Model],\n):\n\
          \    import random\n    line = \"some_model\"\n    print(f\"train loop counter:\
          \ {counter}\")\n    with open(trained_model.path, 'w') as output_file:\n\
          \        output_file.write('line: {}'.format(line))\n    trained_model.metadata['accuracy']\
          \ = random.uniform(0, 1)\n\n"
        image: python:3.12
pipelineInfo:
  name: pl-1
root:
  dag:
    tasks:
      for-loop-2:
        componentRef:
          name: comp-for-loop-2
        dependentTasks:
        - preprocess
        inputs:
          artifacts:
            pipelinechannel--preprocess-output_model:
              taskOutputArtifact:
                outputArtifactKey: output_model
                producerTask: preprocess
        iteratorPolicy:
          parallelismLimit: 2
        parameterIterator:
          itemInput: pipelinechannel--loop-item-param-1
          items:
            raw: '[1, 2, 3, 4]'
        taskInfo:
          name: for-loop-2
      preprocess:
        cachingOptions: {}
        componentRef:
          name: comp-preprocess
        inputs:
          parameters:
            message:
              runtimeValue:
                constant: hello-world
        taskInfo:
          name: preprocess
schemaVersion: 2.1.0
sdkVersion: kfp-2.7.0

The pipeline uploads fine and renders fine. However, when I run it, I get the following error in the UI:

Run creation failed
{"error":"Failed to create a new run: InternalServerError: Failed to validate workflow for (): 
templates.entrypoint.tasks.root templates.root.tasks.for-loop-2-loop templates.comp-for-loop-2-loop-for-loop-2 
sorting failed: invalid dependency preprocess","code":13,"message":"Failed to create a new run: InternalServerError: 
Failed to validate workflow for (): templates.entrypoint.tasks.root templates.root.tasks.for-loop-2-loop 
templates.comp-for-loop-2-loop-for-loop-2 sorting failed: invalid dependency preprocess","details":
[{"@type":"type.googleapis.com/google.rpc.Status","code":13,"message":"Internal Server Error"}]}

I can see the following in the apiserver logs:


I0801 17:54:36.656761       1 interceptor.go:29] /kubeflow.pipelines.backend.api.v2beta1.RunService/CreateRun handler starting
I0801 17:54:36.670814       1 error.go:278] templates.entrypoint.tasks.root templates.root.tasks.for-loop-2-loop templates.comp-for-loop-2-loop-for-loop-2 sorting failed: invalid dependency preprocess
InternalServerError: Failed to validate workflow for ()
github.com/kubeflow/pipelines/backend/src/common/util.NewInternalServerError
	/opt/app-root/src/backend/src/common/util/error.go:144
github.com/kubeflow/pipelines/backend/src/apiserver/resource.(*ResourceManager).CreateRun
	/opt/app-root/src/backend/src/apiserver/resource/resource_manager.go:516
github.com/kubeflow/pipelines/backend/src/apiserver/server.(*RunServer).createRun
	/opt/app-root/src/backend/src/apiserver/server/run_server.go:131
github.com/kubeflow/pipelines/backend/src/apiserver/server.(*RunServer).CreateRun
	/opt/app-root/src/backend/src/apiserver/server/run_server.go:500
github.com/kubeflow/pipelines/backend/api/v2beta1/go_client._RunService_CreateRun_Handler.func1
	/opt/app-root/src/backend/api/v2beta1/go_client/run.pb.go:2711
main.apiServerInterceptor
	/opt/app-root/src/backend/src/apiserver/interceptor.go:30
github.com/kubeflow/pipelines/backend/api/v2beta1/go_client._RunService_CreateRun_Handler
	/opt/app-root/src/backend/api/v2beta1/go_client/run.pb.go:2713
google.golang.org/grpc.(*Server).processUnaryRPC
	/opt/app-root/src/go/pkg/mod/google.golang.org/[email protected]/server.go:1335
google.golang.org/grpc.(*Server).handleStream
	/opt/app-root/src/go/pkg/mod/google.golang.org/[email protected]/server.go:1712
google.golang.org/grpc.(*Server).serveStreams.func1.1
	/opt/app-root/src/go/pkg/mod/google.golang.org/[email protected]/server.go:947
runtime.goexit
	/usr/lib/golang/src/runtime/asm_amd64.s:1650
Failed to create a new run
github.com/kubeflow/pipelines/backend/src/common/util.(*UserError).wrap
	/opt/app-root/src/backend/src/common/util/error.go:271
github.com/kubeflow/pipelines/backend/src/common/util.Wrap
	/opt/app-root/src/backend/src/common/util/error.go:350
github.com/kubeflow/pipelines/backend/src/apiserver/server.(*RunServer).CreateRun
	/opt/app-root/src/backend/src/apiserver/server/run_server.go:502
github.com/kubeflow/pipelines/backend/api/v2beta1/go_client._RunService_CreateRun_Handler.func1
	/opt/app-root/src/backend/api/v2beta1/go_client/run.pb.go:2711
main.apiServerInterceptor
	/opt/app-root/src/backend/src/apiserver/interceptor.go:30
github.com/kubeflow/pipelines/backend/api/v2beta1/go_client._RunService_CreateRun_Handler
	/opt/app-root/src/backend/api/v2beta1/go_client/run.pb.go:2713
google.golang.org/grpc.(*Server).processUnaryRPC
	/opt/app-root/src/go/pkg/mod/google.golang.org/[email protected]/server.go:1335
google.golang.org/grpc.(*Server).handleStream
	/opt/app-root/src/go/pkg/mod/google.golang.org/[email protected]/server.go:1712
google.golang.org/grpc.(*Server).serveStreams.func1.1
	/opt/app-root/src/go/pkg/mod/google.golang.org/[email protected]/server.go:947
runtime.goexit
	/usr/lib/golang/src/runtime/asm_amd64.s:1650
/kubeflow.pipelines.backend.api.v2beta1.RunService/CreateRun call failed
github.com/kubeflow/pipelines/backend/src/common/util.(*UserError).wrapf
	/opt/app-root/src/backend/src/common/util/error.go:266
github.com/kubeflow/pipelines/backend/src/common/util.Wrapf
	/opt/app-root/src/backend/src/common/util/error.go:337
main.apiServerInterceptor
	/opt/app-root/src/backend/src/apiserver/interceptor.go:32
github.com/kubeflow/pipelines/backend/api/v2beta1/go_client._RunService_CreateRun_Handler
	/opt/app-root/src/backend/api/v2beta1/go_client/run.pb.go:2713
google.golang.org/grpc.(*Server).processUnaryRPC
	/opt/app-root/src/go/pkg/mod/google.golang.org/[email protected]/server.go:1335
google.golang.org/grpc.(*Server).handleStream
	/opt/app-root/src/go/pkg/mod/google.golang.org/[email protected]/server.go:1712
google.golang.org/grpc.(*Server).serveStreams.func1.1
	/opt/app-root/src/go/pkg/mod/google.golang.org/[email protected]/server.go:947
runtime.goexit
	/usr/lib/golang/src/runtime/asm_amd64.s:1650

removing the parallelism=2 like so doesn't help:

    with dsl.ParallelFor(items=[1, 2, 3, 4]) as counter:
        train(model=preprocess_task.outputs['output_model'], counter=counter).set_caching_options(enable_caching=False)

removing the loop entirely does help, i.e. no more panic:

    train(model=preprocess_task.outputs['output_model'], counter=1).set_caching_options(enable_caching=False)

leaving the hold in place :)

@gmfrasca
Copy link
Member Author

gmfrasca commented Sep 6, 2024

I was able to replicate the problem - this occurs when a ParallelFor task has a dependantTask before it, and the new extra layer of abstraction meant the DAG Driver was not pointed at the root task list and therefore could not detect the external dependencies. Also, the 'iterator' task in the abstracted DAG itself was using the Argo depends functionality which also only works with tasks within the same DAG; since the root task also leverages this it is safe to remove from the sub-DAG templates.

Pushing an update that should address the problem @gregsheremeta found. An in-depth re-review would be greatly appreciated given the complexity of the problem

Copy link

New changes are detected. LGTM label has been removed.

@google-oss-prow google-oss-prow bot removed the lgtm label Sep 6, 2024
Copy link

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please ask for approval from rimolive. For more information see the Kubernetes Code Review Process.

The full list of commands accepted by this bot can be found here.

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

- Passthrough ParentDagID rather than DriverExecutionID to iterator such
  that iteration item correctly detects dependentTasks.
- Remove depends from iterator DAG as it is already handled by
  root-level task
- Update Iterator template names/nomenclature for clarity
- Update tests accordingly

Signed-off-by: Giulio Frasca <[email protected]>
@gmfrasca
Copy link
Member Author

/retest

@rimolive
Copy link
Member

/rerun-all

@gregsheremeta
Copy link
Contributor

ready for re-review per KFP community meeting last week

[Giulio] Parallellism limits PR/ Re-review
Updated to handle template dependency and DAG parameter passing correctly
In-depth review requested, particularly if anyone has expertise in the DAG/template generation from argocompiler, in case any other functionality was not accounted for

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

feat(backend): Support loop parallelism
6 participants