Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core feature] Support simple dataclass construction from promises in workflow/dynamic #6275

Open
2 tasks done
madwed-stripe opened this issue Feb 26, 2025 · 2 comments
Open
2 tasks done
Labels
enhancement New feature or request flytekit FlyteKit Python related issue

Comments

@madwed-stripe
Copy link

madwed-stripe commented Feb 26, 2025

Motivation: Why do you think this is important?

A common pain point for our systems is the need to add a create_dataclass task at the end of a dynamic workflow. It would be wonderful to support dataclass construction from promises in a dynamic (and in a workflow).

Goal: What should the final outcome look like, ideally?

Here's a simple example of what we need to do today:

@dataclass
class MyCollection:
    values: dict[str, float]


@task
def transform_item(item: float) -> float:
    return 1.0


@task
def make_collection(values: dict[str, float]) -> MyCollection:
    return MyCollection(values)


@dynamic
def transform_collection(collection: MyCollection) -> MyCollection:
    transformed = {k: transform_item(item=v) for k, v in collection.values.items()}
    return make_collection(values=transformed)

We run a ton of these tasks and the host set up time adds up. We'd like to be able to do:

@dynamic
def transform_collection(collection: MyCollection) -> MyCollection:
    transformed = {k: transform_item(item=v) for k, v in collection.values.items()}
    return MyCollection(transformed)

Describe alternatives you've considered

Maybe this would be possible with a custom transformer, but I'd rather this be part of flyte core. We could also achieve something like this with @eager, but then we need to keep the dynamic host around for the duration of the contained transform_item operations and those can take quite a long time.

Propose: Link/Inline OR Additional context

No response

Are you sure this issue hasn't been raised already?

  • Yes

Have you read the Code of Conduct?

  • Yes
@madwed-stripe madwed-stripe added enhancement New feature or request untriaged This issues has not yet been looked at by the Maintainers labels Feb 26, 2025
Copy link

welcome bot commented Feb 26, 2025

Thank you for opening your first issue here! 🛠

@eapolinario eapolinario added flytekit FlyteKit Python related issue and removed untriaged This issues has not yet been looked at by the Maintainers labels Feb 27, 2025
@thomasjpfan
Copy link
Member

thomasjpfan commented Feb 27, 2025

A workaround is to start with an empty dict, iterate through the tasks, and add the results to the dict in the dynamic workflow:

from dataclasses import dataclass
from flytekit import task, dynamic, workflow


@dataclass
class MyCollection:
    stuff: dict[str, float]


@task
def transform_item(item: float) -> float:
    return 2.0 * item


@task
def make_collection(values: dict[str, float]) -> MyCollection:
    return MyCollection(values)


@dynamic
def transform_collection(collection: MyCollection) -> MyCollection:
    transformed = {}
    for k, v in collection.stuff.items():
        transformed[k] = transform_item(item=v)
    return make_collection(values=transformed)


@workflow
def wf(collection: MyCollection = MyCollection(stuff={"A": 1.2, "B": 3.4, "C": -1.4})) -> MyCollection:
    return transform_collection(collection=collection)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request flytekit FlyteKit Python related issue
Projects
Status: Backlog
Development

No branches or pull requests

3 participants