Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Flyte][3][flytepropeller][Attribute Access][flytectl] Binary IDL With MessagePack #5763

Merged
merged 11 commits into from
Oct 5, 2024

Conversation

Future-Outlier
Copy link
Member

@Future-Outlier Future-Outlier commented Sep 20, 2024

Tracking Issue

Issue #5318

Why are these changes needed?

  1. Support attribute access with 100% type correctness.
  2. Add support for flytectl to create and get executions.

Note: This PR includes handling of primitive types, Flyte types, and dataclasses.

What changes are proposed in this pull request?

Attribute Access

  1. Use MsgPack to deserialize msgpack bytes in Binary Literal.
  2. Retrieve attributes from the deserialized object.
  3. Serialize the retrieved attributes back to Binary Literal.

flytectl

  • When receiving non-string input in flytectl, serialize it to msgpack bytes and create a binary scalar directly.

This version is more concise, avoids redundant words, and clarifies the structure a bit.

How was this patch tested?

Attribute Access

unit tests and remote execution

flytectl

command line

  • create execution
flytectl create execution --execFile build/PR/JSON/stacked_PRs/flytectl/create_task.yaml -p flytesnacks -d development
image image
iamRoleARN: ""
inputs:
    dc:
        a: 1
        b: 3.14
        c: example_string
envs: {}
kubeServiceAcct: ""
targetDomain: ""
targetProject: ""
task: build.PR.JSON.stacked_PRs.flytectl.dataclass_simple.t_simple
version: kNjxOflWVudmuIRR4tLHeQ
  • get execution
flytectl get task -d development -p flytesnacks build.PR.JSON.stacked_PRs.flytectl.dataclass_simple.t_simple --execFile build/PR/JSON/stacked_PRs/flytectl/get_task.yaml --version kNjxOflWVudmuIRR4tLHeQ
image

get_task.yaml

iamRoleARN: ""
inputs:
    dc: {}
envs: {}
kubeServiceAcct: ""
targetDomain: ""
targetProject: ""
task: build.PR.JSON.stacked_PRs.flytectl.dataclass_simple.t_simple
version: kNjxOflWVudmuIRR4tLHeQ

Setup process

single binary

Screenshots

  • Unit Tests
image
  • Local Execution
    image

  • Remote Execution
    image

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Copy link

codecov bot commented Sep 20, 2024

Codecov Report

Attention: Patch coverage is 82.60870% with 16 lines in your changes missing coverage. Please review.

Project coverage is 36.35%. Comparing base (881d7a2) to head (34914db).
Report is 2 commits behind head on master.

Files with missing lines Patch % Lines
...opeller/pkg/controller/nodes/attr_path_resolver.go 78.37% 12 Missing and 4 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #5763      +/-   ##
==========================================
+ Coverage   36.31%   36.35%   +0.03%     
==========================================
  Files        1304     1304              
  Lines      110072   110147      +75     
==========================================
+ Hits        39974    40044      +70     
  Misses      65936    65936              
- Partials     4162     4167       +5     
Flag Coverage Δ
unittests-datacatalog 51.37% <ø> (ø)
unittests-flyteadmin 55.60% <ø> (ø)
unittests-flytecopilot 12.17% <ø> (ø)
unittests-flytectl 62.26% <ø> (+0.04%) ⬆️
unittests-flyteidl 7.17% <100.00%> (+0.04%) ⬆️
unittests-flyteplugins 53.35% <ø> (ø)
unittests-flytepropeller 42.02% <78.66%> (+0.10%) ⬆️
unittests-flytestdlib 55.35% <ø> (-0.03%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Signed-off-by: Future-Outlier <[email protected]>
Future-Outlier and others added 3 commits September 25, 2024 09:38
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
@Future-Outlier Future-Outlier changed the title [Flyte][3][Attribute Access] Binary IDL With MessagePack [Flyte][3][flytepropeller][Attribute Access][flytectl] Binary IDL With MessagePack Sep 26, 2024
Comment on lines -127 to -133
t.Run("Generic", func(t *testing.T) {
literalVal := map[string]interface{}{
"x": 1,
"y": "ystringvalue",
}
var literalType = &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_STRUCT}}
lit, err := MakeLiteralForType(literalType, literalVal)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
@@ -559,12 +562,35 @@ func MakeLiteralForType(t *core.LiteralType, v interface{}) (*core.Literal, erro
strValue = fmt.Sprintf("%.0f", math.Trunc(f))
}
if newT.Simple == core.SimpleType_STRUCT {
// If the type is a STRUCT, we expect the input to be a complex object
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this not break if the clients or otherside have the old view of struct?
i.e, this seems backwards incompatible?
cc @EngHabu

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will not break JSON string case, since this is not a string type.

Signed-off-by: Future-Outlier <[email protected]>
@Future-Outlier
Copy link
Member Author

Future-Outlier commented Oct 4, 2024

Here is how I tested backward compatibility. (mock the process user update propeller)

  1. start the flyte single binary using the master branch (old)
  2. run this example below
  3. in the middle, shut down 0
  4. start the flyte single binary using msgpack IDL branch (new)

image

shut down flyte cluster and change to the new flyte cluster

image image

python example:

import os
import tempfile
from dataclasses import dataclass

import pandas as pd
from flytekit import task, workflow, ImageSpec, dynamic
from flytekit.types.directory import FlyteDirectory
from flytekit.types.file import FlyteFile
from flytekit.types.structured import StructuredDataset
from mashumaro.mixins.json import DataClassJSONMixin

image = ImageSpec(
    base_image="ghcr.io/flyteorg/flytekit:py3.9-1.11.0",
    packages=["pandas", "pyarrow"],
    registry="localhost:30000"
)

@dataclass
class Datum(DataClassJSONMixin):
    x: int
    y: str
    z: dict[int, str]

@task(container_image=image)
def stringify(s: int) -> Datum:
    """
    A dataclass return will be treated as a single complex JSON return.
    """
    return Datum(x=s, y=str(s), z={s: str(s)})


@task(container_image=image)
def add(x: Datum, y: Datum) -> Datum:
    """
    Flytekit automatically converts the provided JSON into a data class.
    If the structures don't match, it triggers a runtime failure.
    """
    x.z.update(y.z)
    return Datum(x=x.x + y.x, y=x.y + y.y, z=x.z)

@dataclass
class FlyteTypes(DataClassJSONMixin):
    dataframe: StructuredDataset
    file: FlyteFile
    directory: FlyteDirectory


@task(container_image=image)
def upload_data() -> FlyteTypes:
    """
    Flytekit will upload FlyteFile, FlyteDirectory and StructuredDataset to the blob store,
    such as GCP or S3.
    """
    # 1. StructuredDataset
    df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]})

    # 2. FlyteDirectory
    temp_dir = tempfile.mkdtemp(prefix="flyte-")
    df.to_parquet(temp_dir + "/df.parquet")

    # 3. FlyteFile
    file_path = tempfile.NamedTemporaryFile(delete=False)
    file_path.write(b"Hello, World!")

    fs = FlyteTypes(
        dataframe=StructuredDataset(dataframe=df),
        file=FlyteFile(file_path.name),
        directory=FlyteDirectory(temp_dir),
    )
    return fs


@task(container_image=image)
def download_data(res: FlyteTypes):
    assert pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]}).equals(res.dataframe.open(pd.DataFrame).all())
    f = open(res.file, "r")
    assert f.read() == "Hello, World!"
    assert os.listdir(res.directory) == ["df.parquet"]

@dynamic(container_image=image)
def dataclass_wf(x: int, y: int) -> (Datum, FlyteTypes):
    for _ in range(3):
        o1 = add(x=stringify(s=x), y=stringify(s=y))
        o2 = upload_data()
        download_data(res=o2)
    return o1, o2

if __name__ == "__main__":
    from flytekit.clis.sdk_in_container import pyflyte
    from click.testing import CliRunner
    import os

    runner = CliRunner()
    path = os.path.realpath(__file__)
    # result = runner.invoke(pyflyte.main,
    #                        ["run", path, "dataclass_wf", "--x", "-1", "--y", "2"])
    # print("Local Execution: ", result.output)
    # #
    result = runner.invoke(pyflyte.main,
                           ["run", "--remote", path, "dataclass_wf", "--x", "-1", "--y", "2"])
    print("Remote Execution: ", result.output)

@Future-Outlier
Copy link
Member Author

Future-Outlier commented Oct 4, 2024

use the old flytekit with a new propeller
(the same Python example)

env:
python: 3.9
flyte backend: this branch
flytekit: 1.11.0 (released on Mar 12, 2024)

image

Copy link
Contributor

@EngHabu EngHabu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @Future-Outlier, what happens if a new flytekit produced a msgpack and old flytekit tried to consume it?

@@ -11,14 +12,16 @@ import (
"github.com/flyteorg/flyte/flytestdlib/storage"
)

const messagepack = "msgpack"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move this const to one place and reference it everywhere...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll move it now.

flytepropeller/pkg/controller/nodes/attr_path_resolver.go Outdated Show resolved Hide resolved
switch resolvedVal := currVal.(type) {
// map
case map[interface{}]interface{}:
tmpVal, exist = resolvedVal[attr.GetStringValue()]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will GetStringValue() return if if the attr is foo[0]? shouldn't it fail in that case and say "you can't index into a struct" or something ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currVal = tmpVal
// list
case []interface{}:
if int(attr.GetIntValue()) >= len(resolvedVal) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto here. If it's foo.bar GetIntValue() will return 0 I presume? it should fail here too

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, GetIntValue() will return 0 , but foo.bar will go to the up map case, so it will called attr.GetStringValue to get the bar attribute

Comment on lines 160 to 163
// Unsupported serialization format
return nil, errors.Errorf(errors.PromiseAttributeResolveError, nodeID,
"Unsupported format '%v' found for literal value.\n"+
"Please ensure the serialization format is supported.", serializationFormat)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case can never happen, right? since you checked for that early on

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I can remove it

@Future-Outlier
Copy link
Member Author

Hey @Future-Outlier, what happens if a new flytekit produced a msgpack and old flytekit tried to consume it?

upstream output (msgpack) -> downstream input (msgpack)
new flytekit -> old flytekit

This will fail when executing the pod plugin, but not crashed the propeller.
Forward compatible is not supported and there's now way to support it, thank you

@Future-Outlier
Copy link
Member Author

  1. Currently, when resolving attributes in a map, an error (not a panic) is raised if the key is not a string.
    (This is backward compatible.)
  2. We use map[any]any because the golangmsgpack library deserializes a msgpack bytes from python dataclass to this format.

Details about 1:
We will support cases like this in the future.
But before this, we have to change the code in flytekit first, currently the case below will become a Struct Type but not int type.

@dataclass
class DC2:
    a: float
    b: DC
    c: List[DC]
    d: Dict[int, DC]
 
o = DC2()
x=o.d[1].a # This should be `int` type, but will return `Struct` type when using `get literal type` function.

@Future-Outlier Future-Outlier merged commit 9abfbda into master Oct 5, 2024
53 checks passed
@Future-Outlier Future-Outlier deleted the binary-idl-with-message-pack-bytes-3 branch October 5, 2024 01:01
siiddhantt pushed a commit to siiddhantt/flyte that referenced this pull request Oct 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants