Skip to content

Commit

Permalink
Add examples reproducing raised issues
Browse files Browse the repository at this point in the history
Add examples reproducing issues argoproj-labs#861 (using with_param with an annotated
input) and argoproj-labs#1234 (using with_param with a Pydantic Input type).

Signed-off-by: Alice Purcell <[email protected]>
  • Loading branch information
alicederyn committed Oct 10, 2024
1 parent 2f820a0 commit 3e0177e
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: dynamic-fanout-
spec:
entrypoint: d
templates:
- dag:
tasks:
- name: generate
template: generate
- arguments:
parameters:
- description: this is some value
name: some-value
value: '{{item}}'
depends: generate
name: consume
template: consume
withParam: '{{tasks.generate.outputs.parameters.some-values}}'
name: d
- name: generate
outputs:
parameters:
- name: some-values
valueFrom:
path: /tmp/hera-outputs/parameters/some-values
script:
args:
- -m
- hera.workflows.runner
- -e
- examples.workflows.experimental.script_annotations_dynamic_fanout:generate
command:
- python
env:
- name: hera__script_annotations
value: ''
- name: hera__outputs_directory
value: /tmp/hera-outputs
image: python:3.9
source: '{{inputs.parameters}}'
- inputs:
parameters:
- description: this is some value
name: some-value
name: consume
script:
args:
- -m
- hera.workflows.runner
- -e
- examples.workflows.experimental.script_annotations_dynamic_fanout:consume
command:
- python
env:
- name: hera__script_annotations
value: ''
image: python:3.9
source: '{{inputs.parameters}}'
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: dynamic-fanout-
spec:
entrypoint: d
templates:
- dag:
tasks:
- name: generate
template: generate
- arguments:
parameters:
- description: this is some value
name: some-value
value: '{{item}}'
depends: generate
name: consume
template: consume
withParam: '{{tasks.generate.outputs.parameters.some-values}}'
name: d
- name: generate
outputs:
parameters:
- name: some-values
valueFrom:
path: /tmp/hera-outputs/parameters/some-values
script:
args:
- -m
- hera.workflows.runner
- -e
- examples.workflows.experimental.script_runner_io_dynamic_fanout:generate
command:
- python
env:
- name: hera__script_annotations
value: ''
- name: hera__outputs_directory
value: /tmp/hera-outputs
- name: hera__script_pydantic_io
value: ''
image: python:3.9
source: '{{inputs.parameters}}'
- inputs:
parameters:
- description: this is some value
name: some-value
name: consume
script:
args:
- -m
- hera.workflows.runner
- -e
- examples.workflows.experimental.script_runner_io_dynamic_fanout:consume
command:
- python
env:
- name: hera__script_annotations
value: ''
- name: hera__script_pydantic_io
value: ''
image: python:3.9
source: '{{inputs.parameters}}'
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""
This example showcases how clients can use Hera to dynamically generate tasks that process outputs from one task in
parallel. This is useful for batch jobs and instances where clients do not know ahead of time how many tasks/entities
they may need to process.
"""

from typing import Annotated, List

from hera.shared import global_config
from hera.workflows import DAG, Parameter, Workflow, script

global_config.experimental_features["script_annotations"] = True


@script(constructor="runner")
def generate() -> Annotated[List[int], Parameter(name="some-values")]:
return [i for i in range(10)]


@script(constructor="runner")
def consume(some_value: Annotated[int, Parameter(name="some-value", description="this is some value")]):
print("Received value: {value}!".format(value=some_value))


# assumes you used `hera.set_global_token` and `hera.set_global_host` so that the workflow can be submitted
with Workflow(generate_name="dynamic-fanout-", entrypoint="d") as w:
with DAG(name="d"):
g = generate(arguments={})
c = consume(with_param=g.get_parameter("some-values"))
g >> c
38 changes: 38 additions & 0 deletions examples/workflows/experimental/script_runner_io_dynamic_fanout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""
This example showcases how clients can use Hera to dynamically generate tasks that process outputs from one task in
parallel. This is useful for batch jobs and instances where clients do not know ahead of time how many tasks/entities
they may need to process.
"""

from typing import Annotated, List

from hera.shared import global_config
from hera.workflows import DAG, Input, Output, Parameter, Workflow, script

global_config.experimental_features["script_pydantic_io"] = True


class GenerateOutput(Output):
some_values: Annotated[List[int], Parameter(name="some-values")]


class ConsumeInput(Input):
some_value: Annotated[int, Parameter(name="some-value", description="this is some value")]


@script(constructor="runner")
def generate() -> GenerateOutput:
return GenerateOutput(some_values=[i for i in range(10)])


@script(constructor="runner")
def consume(input: ConsumeInput) -> None:
print("Received value: {value}!".format(value=input.some_value))


# assumes you used `hera.set_global_token` and `hera.set_global_host` so that the workflow can be submitted
with Workflow(generate_name="dynamic-fanout-", entrypoint="d") as w:
with DAG(name="d"):
g = generate(arguments={})
c = consume(with_param=g.get_parameter("some-values"))
g >> c

0 comments on commit 3e0177e

Please sign in to comment.