Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JSON parsing for inline scripts needs custom processing in certain situations (fan-out/in) #1146

Closed
2 of 6 tasks
ljyanesm opened this issue Aug 5, 2024 · 4 comments
Closed
2 of 6 tasks

Comments

@ljyanesm
Copy link

ljyanesm commented Aug 5, 2024

Pre-bug-report checklist

1. This bug can be reproduced using pure Argo YAML

If yes, it is more likely to be an Argo bug unrelated to Hera. Please double check before submitting an issue to Hera.

2. This bug occurs in Hera when...

  • exporting to YAML
  • submitting to Argo
  • running on Argo with the Hera runner
  • other:

Handling JSON serialised objects generated by Argo Worfklows using the json.loads method included at the beginning of all Scripts

Bug report

Describe the bug
A clear and concise description of what the bug is:

When using nested fan-in patterns on Script tasks the json parsing code introduced to handle Argo parameters can fail, see:

try: message = json.loads(r'''{{inputs.parameters.in_param}}''')
except: message = r'''{{inputs.parameters.in_param}}'''

Unfortunately this results in a broken behaviour if one uses a nested fan-out and fan-in pattern. In these cases the Argo YAML generated isn’t aware of the multiple levels of serialisation, so one ends up with a top level JSON object that contains JSON that has been stringified.

For example consider a nested fan-out fan-in with 2 items and 2 sub items from which one would like to receive the data structure below back during the final fan-in.

[["item_0_subitem_0", "item_0_subitem_1"], ["item_1_subitem_0", "item_1_subitem_1"]]

In practise one receives the two item list below, where each item is a stringified version of the nested list.

["["\"item_0_subitem_0\"","\"item_0_subitem_1\""]","["\"item_1_subitem_0\"","\"item_1_subitem_1\""]"]
The defect is that the content has been serialised twice, but only deserialised once. To fix this one needs a way to deserialise, then iterate over all the items in the data structure and deserialise those.

Error log if applicable:

There is no immediate error apart from the inconvenience of having to handle a partially deserialised JSON object.

To Reproduce
Full Hera code to reproduce the bug:

from hera.workflows import Parameter, Steps, Workflow, script
from hera.workflows.models import ValueFrom

PROCESS_INNER = "process-inner"
@script(outputs=Parameter(name="items", value_from=ValueFrom(path="/tmp/items.json")))
def generate_outer_items():
    import json
    json.dump([f"item_{i}" for i in range(2)],open(f"/tmp/items.json", "w"))


@script(outputs=Parameter(name="processed_item", value_from=ValueFrom(path="/tmp/processed_item.json")))
def generate_inner_items(item: str):
    import json
    json.dump([(f"{item}_subitem_{i}", "outer") for i in range(2)], open(f"/tmp/processed_item.json", "w"))


@script(name=PROCESS_INNER,outputs=Parameter(name="processed_subitem", value_from=ValueFrom(path="/tmp/processed_subitem.json")))
def inner(subitem: str):
    import json
    json.dump((subitem, "test"), open(f"/tmp/processed_subitem.json", "w"))


@script()
def fan_in(all_subitem_list):
    import json
    # When passing parameters via Argo, these have to be JSON serialised. Hera manages deserialisation automatically,
    # unfortunately, in this example and other cases where multiple fan-out layers collapse on a single fan-in, the JSON
    # deserialisation is not aware of the workflow's topology and can result in partially deserialised objects.
    # The decode function is used to handle these partially deserialised objects.
    def decode(jo):
        def decode_inner(o):
            try:
                d = json.loads(o)
            except (json.JSONDecodeError, TypeError):
                return o
            if isinstance(d, str):
                return decode_inner(d)
            elif isinstance(d, list):
                return [decode_inner(i) for i in d]
            elif isinstance(d, dict):
                return {k: decode_inner(v) for k, v in d.items()}
            else:
                return d
        return decode_inner(json.dumps(jo))
    print("raw:")
    print(all_subitem_list)
    print("decoded:")
    print(decode(all_subitem_list))


with Workflow(
    generate_name="fan-out-fan-in-nested-",
    entrypoint="main",
) as wf:
    with Steps(name="outer",
               inputs=[Parameter(name="input_item"), Parameter(name="same_value")],
               outputs=[Parameter(name="processed_subitems", value_from=
               ValueFrom(parameter="{{steps.%s.outputs.parameters.processed_subitem}}" % PROCESS_INNER
               ))],
               ) as outer:
        inner_items = generate_inner_items(
            arguments={"item": outer.get_parameter("input_item"), "same_value": "same_for_all"},
        )
        processed_inner_item = inner(
            arguments={"subitem": "{{item}}"},
            with_param=inner_items.get_parameter("processed_item")
        )
    with Steps(name="main"):
        outer_items = generate_outer_items()
        processed_outer_items = outer(
            arguments={"input_item": "{{item}}", "same_value": "same_for_all"},
            with_param=outer_items.get_parameter("items")
        )
        # Note: This is a single fan-in for both fan-out levels (outer, inner), see the `decode` function for how to handle
        # the JSON deserialisation
        fan_in(
            arguments={"all_subitem_list": processed_outer_items.get_parameter("processed_subitems")}
        )

Expected behavior
A clear and concise description of what you expected to happen:

The input object to fan_in should be a valid python object without any remanent JSON strings within.

Environment

  • Hera Version: 5.X.X
  • Python Version: 3.11.X
  • Argo Version: 3.X.X

Additional context
Please notice the decode(jo) which could be used alternatively to the original json.loads(r'''{{inputs.parameters.in_param}}''' to handle this issue.

@ljyanesm ljyanesm changed the title Nested fan-in json parsing is awkward JSON parsing can be awkward in certain situations Aug 5, 2024
@elliotgunton
Copy link
Collaborator

Thanks @ljyanesm! This is definitely on our radar - I think #827 is a similar problem with escaping the nested JSON. I think we'd need a custom decoder the same as you've written for def decode(jo) as Argo itself garbles the JSON that it exports into the output parameters.

@elliotgunton elliotgunton changed the title JSON parsing can be awkward in certain situations JSON parsing for Inline scripts needs custom processing in certain situations (fan-out/in) Aug 21, 2024
@elliotgunton elliotgunton changed the title JSON parsing for Inline scripts needs custom processing in certain situations (fan-out/in) JSON parsing for inline scripts needs custom processing in certain situations (fan-out/in) Aug 21, 2024
@elliotgunton
Copy link
Collaborator

Hey @ljyanesm - turns out this was a bug upstream - argoproj/argo-workflows#13510 - with a PR that's now been merged! I think I'll leave this open until it's actually released

@ljyanesm
Copy link
Author

ljyanesm commented Sep 5, 2024

Thank you @elliotgunton!

This is really great, looking forward to these changes.

@elliotgunton
Copy link
Collaborator

Fix was released in Argo Workflows 3.5.11! 🚀

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants