From 468220b72243a96d582b3ccc7614cbb6da9109e5 Mon Sep 17 00:00:00 2001 From: novahow Date: Mon, 1 Apr 2024 20:25:35 +0800 Subject: [PATCH 01/15] add additional_decks support Signed-off-by: novahow modified: flytekit/core/base_task.py modified: flytekit/core/python_function_task.py modified: flytekit/core/task.py modified: flytekit/deck/deck.py modified: tests/flytekit/unit/core/test_flyte_file.py --- flytekit/core/base_task.py | 78 +++++++++++++++++---- flytekit/core/python_function_task.py | 5 +- flytekit/core/task.py | 5 ++ flytekit/deck/deck.py | 12 ++++ tests/flytekit/unit/core/test_flyte_file.py | 10 +-- 5 files changed, 89 insertions(+), 21 deletions(-) diff --git a/flytekit/core/base_task.py b/flytekit/core/base_task.py index 7411fd635e..4ff4df27e0 100644 --- a/flytekit/core/base_task.py +++ b/flytekit/core/base_task.py @@ -462,6 +462,7 @@ def __init__( environment: Optional[Dict[str, str]] = None, disable_deck: Optional[bool] = None, enable_deck: Optional[bool] = None, + additional_decks: Optional[List[str]] = None, **kwargs, ): """ @@ -477,6 +478,8 @@ def __init__( execution of the task. Supplied as a dictionary of key/value pairs disable_deck (bool): (deprecated) If true, this task will not output deck html file enable_deck (bool): If true, this task will output deck html file + additional_decks (Optional[List[str]]): List of additional decks besides [timeline and source code] to be + generated for this task. Valid values can be selected from [Input, Output] """ super().__init__( task_type=task_type, @@ -487,6 +490,23 @@ def __init__( self._python_interface = interface if interface else Interface() self._environment = environment if environment else {} self._task_config = task_config + self._additional_decks = additional_decks if additional_decks is not None else [] + self._full_deck = enable_deck is True or disable_deck is False + from flytekit.deck.deck import DeckFields + + deck_members = set([_field.value for _field in DeckFields]) + if self._full_deck: + self._additional_decks = list(deck_members - set(self.default_decks)) + # enumerate additional decks, check if any of them are invalid + for deck in self._additional_decks: + if deck not in deck_members: + raise ValueError(f"Deck field {deck} is not a valid deck field. Please use one of {deck_members}") + + # first we resolve the conflict between params regarding decks, if any two of [disable_deck, enable_deck, additional_decks] + # are set, we raise an error + configured_deck_params = [disable_deck is not None, enable_deck is not None, additional_decks is not None] + if sum(configured_deck_params) > 1: + raise ValueError("only one of [disable_deck, enable_deck and additional_decks] can be set") if disable_deck is not None: warnings.warn( @@ -494,16 +514,14 @@ def __init__( FutureWarning, ) - # Confirm that disable_deck and enable_deck do not contradict each other - if disable_deck is not None and enable_deck is not None: - raise ValueError("disable_deck and enable_deck cannot both be set at the same time") - + decks_triggered: bool = enable_deck is not False or additional_decks is not None if enable_deck is not None: self._disable_deck = not enable_deck elif disable_deck is not None: self._disable_deck = disable_deck else: - self._disable_deck = True + self._disable_deck = not decks_triggered + if self._python_interface.docstring: if self.docs is None: self._docs = Documentation( @@ -643,18 +661,20 @@ def _output_to_literal_map(self, native_outputs: Dict[int, Any], ctx: FlyteConte def _write_decks(self, native_inputs, native_outputs_as_map, ctx, new_user_params): if self._disable_deck is False: - from flytekit.deck.deck import Deck, _output_deck + from flytekit.deck.deck import Deck, DeckFields, _output_deck - INPUT = "Inputs" - OUTPUT = "Outputs" + INPUT = DeckFields.INPUT.value + OUTPUT = DeckFields.OUTPUT.value - input_deck = Deck(INPUT) - for k, v in native_inputs.items(): - input_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_input_var(k, v))) + if INPUT in self.decks: + input_deck = Deck(INPUT) + for k, v in native_inputs.items(): + input_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_input_var(k, v))) - output_deck = Deck(OUTPUT) - for k, v in native_outputs_as_map.items(): - output_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_output_var(k, v))) + if OUTPUT in self.decks: + output_deck = Deck(OUTPUT) + for k, v in native_outputs_as_map.items(): + output_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_output_var(k, v))) if ctx.execution_state and ctx.execution_state.is_local_execution(): # When we run the workflow remotely, flytekit outputs decks at the end of _dispatch_execute @@ -789,6 +809,36 @@ def disable_deck(self) -> bool: """ return self._disable_deck + @property + def additional_decks(self) -> List[str]: + """ + If not empty, this task will output deck html file for the specified decks + """ + return self._additional_decks + + @property + def default_decks(self) -> List[str]: + """ + returns the default decks that should be output for this task + """ + from flytekit.deck.deck import DeckFields + + return [DeckFields.TIMELINE, DeckFields.SOURCE_CODE] + + @property + def decks(self) -> List[str]: + """ + returns the decks that should be output for this task + """ + return self.default_decks + self.additional_decks + + @property + def full_deck(self) -> bool: + """ + returns whether the full deck should be output for this task + """ + return self._full_deck + class TaskResolverMixin(object): """ diff --git a/flytekit/core/python_function_task.py b/flytekit/core/python_function_task.py index f97d96296e..c009786895 100644 --- a/flytekit/core/python_function_task.py +++ b/flytekit/core/python_function_task.py @@ -351,6 +351,7 @@ def dynamic_execute(self, task_function: Callable, **kwargs) -> Any: def _write_decks(self, native_inputs, native_outputs_as_map, ctx, new_user_params): if self._disable_deck is False: from flytekit.deck import Deck + from flytekit.deck.deck import DeckFields from flytekit.deck.renderer import PythonDependencyRenderer # These errors are raised if the source code can not be retrieved @@ -358,11 +359,11 @@ def _write_decks(self, native_inputs, native_outputs_as_map, ctx, new_user_param source_code = inspect.getsource(self._task_function) from flytekit.deck.renderer import SourceCodeRenderer - source_code_deck = Deck("Source Code") + source_code_deck = Deck(DeckFields.SOURCE_CODE.value) renderer = SourceCodeRenderer() source_code_deck.append(renderer.to_html(source_code)) - python_dependencies_deck = Deck("Dependencies") + python_dependencies_deck = Deck(DeckFields.DEPENDENCIES.value) renderer = PythonDependencyRenderer() python_dependencies_deck.append(renderer.to_html()) diff --git a/flytekit/core/task.py b/flytekit/core/task.py index f1417feb13..506dca99fb 100644 --- a/flytekit/core/task.py +++ b/flytekit/core/task.py @@ -114,6 +114,7 @@ def task( docs: Optional[Documentation] = ..., disable_deck: Optional[bool] = ..., enable_deck: Optional[bool] = ..., + additional_decks: Optional[List[str]] = ..., pod_template: Optional["PodTemplate"] = ..., pod_template_name: Optional[str] = ..., accelerator: Optional[BaseAccelerator] = ..., @@ -152,6 +153,7 @@ def task( docs: Optional[Documentation] = ..., disable_deck: Optional[bool] = ..., enable_deck: Optional[bool] = ..., + additional_decks: Optional[List[str]] = ..., pod_template: Optional["PodTemplate"] = ..., pod_template_name: Optional[str] = ..., accelerator: Optional[BaseAccelerator] = ..., @@ -189,6 +191,7 @@ def task( docs: Optional[Documentation] = None, disable_deck: Optional[bool] = None, enable_deck: Optional[bool] = None, + additional_decks: Optional[List[str]] = None, pod_template: Optional["PodTemplate"] = None, pod_template_name: Optional[str] = None, accelerator: Optional[BaseAccelerator] = None, @@ -309,6 +312,7 @@ def launch_dynamically(): :param task_resolver: Provide a custom task resolver. :param disable_deck: (deprecated) If true, this task will not output deck html file :param enable_deck: If true, this task will output deck html file + :param additional_decks: If specified, this task will output deck html file with the additional fields specified in the list :param docs: Documentation about this task :param pod_template: Custom PodTemplate for this task. :param pod_template_name: The name of the existing PodTemplate resource which will be used in this task. @@ -341,6 +345,7 @@ def wrapper(fn: Callable[..., Any]) -> PythonFunctionTask[T]: task_resolver=task_resolver, disable_deck=disable_deck, enable_deck=enable_deck, + additional_decks=additional_decks, docs=docs, pod_template=pod_template, pod_template_name=pod_template_name, diff --git a/flytekit/deck/deck.py b/flytekit/deck/deck.py index 3ce9d058a4..0fe91e07ba 100644 --- a/flytekit/deck/deck.py +++ b/flytekit/deck/deck.py @@ -1,3 +1,4 @@ +import enum import os import typing from typing import Optional @@ -10,6 +11,17 @@ DECK_FILE_NAME = "deck.html" +class DeckFields(str, enum.Enum): + """ + DeckFields is used to specify the fields that will be rendered in the deck. + """ + + INPUT = "Input" + OUTPUT = "Output" + SOURCE_CODE = "Source Code" + TIMELINE = "Timeline" + + class Deck: """ Deck enable users to get customizable and default visibility into their tasks. diff --git a/tests/flytekit/unit/core/test_flyte_file.py b/tests/flytekit/unit/core/test_flyte_file.py index 6e055ca399..108f8b394d 100644 --- a/tests/flytekit/unit/core/test_flyte_file.py +++ b/tests/flytekit/unit/core/test_flyte_file.py @@ -261,7 +261,7 @@ def my_wf() -> FlyteFile: def test_file_handling_remote_file_handling(): SAMPLE_DATA = "https://raw.githubusercontent.com/jbrownlee/Datasets/master/pima-indians-diabetes.data.csv" - @task + @task(enable_deck=False) def t1() -> FlyteFile: return SAMPLE_DATA @@ -312,7 +312,7 @@ def my_wf() -> FlyteFile: def test_file_handling_remote_file_handling_flyte_file(): SAMPLE_DATA = "https://raw.githubusercontent.com/jbrownlee/Datasets/master/pima-indians-diabetes.data.csv" - @task + @task(enable_deck=False) def t1() -> FlyteFile: # Unlike the test above, this returns the remote path wrapped in a FlyteFile object return FlyteFile(SAMPLE_DATA) @@ -606,7 +606,7 @@ def test_for_downloading(): @pytest.mark.sandbox_test def test_file_open_things(): - @task + @task(enable_deck=False) def write_this_file_to_s3() -> FlyteFile: ctx = FlyteContextManager.current_context() r = ctx.file_access.get_random_string() @@ -614,7 +614,7 @@ def write_this_file_to_s3() -> FlyteFile: ctx.file_access.put(__file__, dest) return FlyteFile(path=dest) - @task + @task(enable_deck=False) def copy_file(ff: FlyteFile) -> FlyteFile: new_file = FlyteFile.new_remote_file(ff.remote_path) with ff.open("r") as r: @@ -622,7 +622,7 @@ def copy_file(ff: FlyteFile) -> FlyteFile: w.write(r.read()) return new_file - @task + @task(enable_deck=False) def print_file(ff: FlyteFile): with open(ff, "r") as fh: print(len(fh.readlines())) From 16ef0d0fe86188d0d1e0ccf9fd603163685ffd47 Mon Sep 17 00:00:00 2001 From: novahow Date: Mon, 1 Apr 2024 21:03:13 +0800 Subject: [PATCH 02/15] add tests and remove confusing fields Signed-off-by: novahow modified: flytekit/core/base_task.py modified: flytekit/deck/deck.py modified: tests/flytekit/unit/deck/test_deck.py --- flytekit/core/base_task.py | 35 ++++--------------- flytekit/deck/deck.py | 2 -- tests/flytekit/unit/deck/test_deck.py | 48 ++++++++++++++++++++++++++- 3 files changed, 53 insertions(+), 32 deletions(-) diff --git a/flytekit/core/base_task.py b/flytekit/core/base_task.py index 4ff4df27e0..72d1181e08 100644 --- a/flytekit/core/base_task.py +++ b/flytekit/core/base_task.py @@ -491,12 +491,12 @@ def __init__( self._environment = environment if environment else {} self._task_config = task_config self._additional_decks = additional_decks if additional_decks is not None else [] - self._full_deck = enable_deck is True or disable_deck is False from flytekit.deck.deck import DeckFields deck_members = set([_field.value for _field in DeckFields]) - if self._full_deck: - self._additional_decks = list(deck_members - set(self.default_decks)) + full_deck = enable_deck is True or disable_deck is False + if full_deck: + self._additional_decks = list(deck_members) # enumerate additional decks, check if any of them are invalid for deck in self._additional_decks: if deck not in deck_members: @@ -510,7 +510,7 @@ def __init__( if disable_deck is not None: warnings.warn( - "disable_deck was deprecated in 1.10.0, please use enable_deck instead", + "disable_deck was deprecated in 1.10.0, please use enable_deck or additional_decks instead", FutureWarning, ) @@ -666,12 +666,12 @@ def _write_decks(self, native_inputs, native_outputs_as_map, ctx, new_user_param INPUT = DeckFields.INPUT.value OUTPUT = DeckFields.OUTPUT.value - if INPUT in self.decks: + if INPUT in self.additional_decks: input_deck = Deck(INPUT) for k, v in native_inputs.items(): input_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_input_var(k, v))) - if OUTPUT in self.decks: + if OUTPUT in self.additional_decks: output_deck = Deck(OUTPUT) for k, v in native_outputs_as_map.items(): output_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_output_var(k, v))) @@ -816,29 +816,6 @@ def additional_decks(self) -> List[str]: """ return self._additional_decks - @property - def default_decks(self) -> List[str]: - """ - returns the default decks that should be output for this task - """ - from flytekit.deck.deck import DeckFields - - return [DeckFields.TIMELINE, DeckFields.SOURCE_CODE] - - @property - def decks(self) -> List[str]: - """ - returns the decks that should be output for this task - """ - return self.default_decks + self.additional_decks - - @property - def full_deck(self) -> bool: - """ - returns whether the full deck should be output for this task - """ - return self._full_deck - class TaskResolverMixin(object): """ diff --git a/flytekit/deck/deck.py b/flytekit/deck/deck.py index 0fe91e07ba..e6ba20c8e3 100644 --- a/flytekit/deck/deck.py +++ b/flytekit/deck/deck.py @@ -18,8 +18,6 @@ class DeckFields(str, enum.Enum): INPUT = "Input" OUTPUT = "Output" - SOURCE_CODE = "Source Code" - TIMELINE = "Timeline" class Deck: diff --git a/tests/flytekit/unit/deck/test_deck.py b/tests/flytekit/unit/deck/test_deck.py index 9cf497bccd..652266fa4e 100644 --- a/tests/flytekit/unit/deck/test_deck.py +++ b/tests/flytekit/unit/deck/test_deck.py @@ -71,6 +71,52 @@ def t1(a: int) -> str: assert len(ctx.user_space_params.decks) == expected_decks +@pytest.mark.parametrize( + "additional_decks,expected_decks", + [ + ([], 2), # time line deck + source code deck + (["Input"], 3), # time line deck + source code deck + input deck + (["Output", "Input"], 4), # time line deck + source code deck + input and output decks + (None, 2), # time line deck + source code deck + ], +) +def test_additional_deck_for_task(additional_decks, expected_decks): + ctx = FlyteContextManager.current_context() + + kwargs = {} + kwargs["additional_decks"] = additional_decks + + @task(**kwargs) + def t1(a: int) -> str: + return str(a) + + t1(a=3) + assert len(ctx.user_space_params.decks) == expected_decks + + +@pytest.mark.parametrize( + "additional_decks,enable_deck,disable_deck", + [ + ([], True, None), + (["Input"], None, False), + (["Output", "Input"], False, None), + (None, True, False), + (["WrongDeck", "Input", "Output"], None, None), # WrongDeck is not a valid field + ], +) +def test_invalid_deck_params(additional_decks, enable_deck, disable_deck): + kwargs = {} + kwargs["additional_decks"] = additional_decks + kwargs["enable_deck"] = enable_deck + kwargs["disable_deck"] = disable_deck + + with pytest.raises(ValueError): + + @task(**kwargs) + def t1(a: int) -> str: + return str(a) + + @pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") @pytest.mark.filterwarnings("ignore:disable_deck was deprecated") @pytest.mark.parametrize( @@ -129,7 +175,7 @@ def t_df(a: str) -> int: def test_deck_deprecation_warning_disable_deck(): - warn_msg = "disable_deck was deprecated in 1.10.0, please use enable_deck instead" + warn_msg = "disable_deck was deprecated in 1.10.0, please use enable_deck or additional_decks instead" with pytest.warns(FutureWarning, match=warn_msg): @task(disable_deck=False) From ddccf6144b658df1055d1ee151ab100e9602b5fd Mon Sep 17 00:00:00 2001 From: novahow Date: Thu, 4 Apr 2024 12:38:34 +0800 Subject: [PATCH 03/15] add deckselector Signed-off-by: novahow modified: flytekit/core/base_task.py modified: flytekit/core/context_manager.py modified: flytekit/core/task.py modified: flytekit/deck/deck.py modified: tests/flytekit/unit/deck/test_deck.py --- flytekit/core/base_task.py | 46 +++++++++++++-------------- flytekit/core/context_manager.py | 19 +++++++++-- flytekit/core/task.py | 8 ++--- flytekit/deck/deck.py | 10 +++++- tests/flytekit/unit/deck/test_deck.py | 8 ++--- 5 files changed, 57 insertions(+), 34 deletions(-) diff --git a/flytekit/core/base_task.py b/flytekit/core/base_task.py index 72d1181e08..f072285b89 100644 --- a/flytekit/core/base_task.py +++ b/flytekit/core/base_task.py @@ -68,6 +68,7 @@ from flytekit.core.tracker import TrackedInstance from flytekit.core.type_engine import TypeEngine, TypeTransformerFailedError from flytekit.core.utils import timeit +from flytekit.deck.deck import DeckFields from flytekit.loggers import logger from flytekit.models import dynamic_job as _dynamic_job from flytekit.models import interface as _interface_models @@ -462,7 +463,7 @@ def __init__( environment: Optional[Dict[str, str]] = None, disable_deck: Optional[bool] = None, enable_deck: Optional[bool] = None, - additional_decks: Optional[List[str]] = None, + deck_selector: Optional[List[DeckFields]] = None, **kwargs, ): """ @@ -478,8 +479,8 @@ def __init__( execution of the task. Supplied as a dictionary of key/value pairs disable_deck (bool): (deprecated) If true, this task will not output deck html file enable_deck (bool): If true, this task will output deck html file - additional_decks (Optional[List[str]]): List of additional decks besides [timeline and source code] to be - generated for this task. Valid values can be selected from [Input, Output] + deck_selector (Optional[List[DeckFields]]): List of decks to be + generated for this task. Valid values can be selected from fields of ``flytekit.deck.deck.DeckFields`` enum """ super().__init__( task_type=task_type, @@ -490,31 +491,30 @@ def __init__( self._python_interface = interface if interface else Interface() self._environment = environment if environment else {} self._task_config = task_config - self._additional_decks = additional_decks if additional_decks is not None else [] - from flytekit.deck.deck import DeckFields + self._deck_selector = deck_selector if deck_selector is not None else [] - deck_members = set([_field.value for _field in DeckFields]) + deck_members = set([_field for _field in DeckFields]) full_deck = enable_deck is True or disable_deck is False if full_deck: - self._additional_decks = list(deck_members) + self._deck_selector = list(deck_members) # enumerate additional decks, check if any of them are invalid - for deck in self._additional_decks: + for deck in self._deck_selector: if deck not in deck_members: raise ValueError(f"Deck field {deck} is not a valid deck field. Please use one of {deck_members}") - # first we resolve the conflict between params regarding decks, if any two of [disable_deck, enable_deck, additional_decks] + # first we resolve the conflict between params regarding decks, if any two of [disable_deck, enable_deck, deck_selector] # are set, we raise an error - configured_deck_params = [disable_deck is not None, enable_deck is not None, additional_decks is not None] + configured_deck_params = [disable_deck is not None, enable_deck is not None, deck_selector is not None] if sum(configured_deck_params) > 1: - raise ValueError("only one of [disable_deck, enable_deck and additional_decks] can be set") + raise ValueError("only one of [disable_deck, enable_deck and deck_selector] can be set") if disable_deck is not None: warnings.warn( - "disable_deck was deprecated in 1.10.0, please use enable_deck or additional_decks instead", + "disable_deck was deprecated in 1.10.0, please use enable_deck or deck_selector instead", FutureWarning, ) - decks_triggered: bool = enable_deck is not False or additional_decks is not None + decks_triggered: bool = enable_deck is True or deck_selector is not None if enable_deck is not None: self._disable_deck = not enable_deck elif disable_deck is not None: @@ -663,16 +663,16 @@ def _write_decks(self, native_inputs, native_outputs_as_map, ctx, new_user_param if self._disable_deck is False: from flytekit.deck.deck import Deck, DeckFields, _output_deck - INPUT = DeckFields.INPUT.value - OUTPUT = DeckFields.OUTPUT.value + INPUT = DeckFields.INPUT + OUTPUT = DeckFields.OUTPUT - if INPUT in self.additional_decks: - input_deck = Deck(INPUT) + if DeckFields.INPUT in self.deck_selector: + input_deck = Deck(INPUT.value) for k, v in native_inputs.items(): input_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_input_var(k, v))) - - if OUTPUT in self.additional_decks: - output_deck = Deck(OUTPUT) + + if DeckFields.OUTPUT in self.deck_selector: + output_deck = Deck(OUTPUT.value) for k, v in native_outputs_as_map.items(): output_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_output_var(k, v))) @@ -775,7 +775,7 @@ def pre_execute(self, user_params: Optional[ExecutionParameters]) -> Optional[Ex This should return either the same context of the mutated context """ - return user_params + return user_params.with_rendered_decks(self._deck_selector).build() @abstractmethod def execute(self, **kwargs) -> Any: @@ -810,11 +810,11 @@ def disable_deck(self) -> bool: return self._disable_deck @property - def additional_decks(self) -> List[str]: + def deck_selector(self) -> List[DeckFields]: """ If not empty, this task will output deck html file for the specified decks """ - return self._additional_decks + return self._deck_selector class TaskResolverMixin(object): diff --git a/flytekit/core/context_manager.py b/flytekit/core/context_manager.py index f70f10bc94..0adaaafabb 100644 --- a/flytekit/core/context_manager.py +++ b/flytekit/core/context_manager.py @@ -39,6 +39,7 @@ if typing.TYPE_CHECKING: from flytekit import Deck + from flytekit.deck.deck import DeckFields from flytekit.clients import friendly as friendly_client # noqa # TODO: resolve circular import from flytekit.core.python_auto_container import TaskResolverMixin @@ -89,6 +90,7 @@ class Builder(object): execution_date: typing.Optional[datetime] = None logging: Optional[_logging.Logger] = None task_id: typing.Optional[_identifier.Identifier] = None + rendered_decks: Optional[List[DeckFields]] = None def __init__(self, current: typing.Optional[ExecutionParameters] = None): self.stats = current.stats if current else None @@ -101,6 +103,7 @@ def __init__(self, current: typing.Optional[ExecutionParameters] = None): self.attrs = current._attrs if current else {} self.raw_output_prefix = current.raw_output_prefix if current else None self.task_id = current.task_id if current else None + self.rendered_decks = current.rendered_decks if current else [] def add_attr(self, key: str, v: typing.Any) -> ExecutionParameters.Builder: self.attrs[key] = v @@ -119,6 +122,7 @@ def build(self) -> ExecutionParameters: decks=self.decks, raw_output_prefix=self.raw_output_prefix, task_id=self.task_id, + rendered_decks=self.rendered_decks, **self.attrs, ) @@ -139,6 +143,11 @@ def with_task_sandbox(self) -> Builder: b.checkpoint = cp b.working_dir = task_sandbox_dir return b + + def with_rendered_decks(self, rendered_decks: List[DeckFields]) -> Builder: + b = self.new_builder(self) + b.rendered_decks = rendered_decks + return b def builder(self) -> Builder: return ExecutionParameters.Builder(current=self) @@ -155,6 +164,7 @@ def __init__( checkpoint=None, decks=None, task_id: typing.Optional[_identifier.Identifier] = None, + rendered_decks=None, **kwargs, ): """ @@ -182,6 +192,7 @@ def __init__( self._checkpoint = checkpoint self._decks = decks self._task_id = task_id + self._rendered_decks = [] if rendered_decks is None else rendered_decks @property def stats(self) -> taggable.TaggableStats: @@ -274,7 +285,7 @@ def default_deck(self) -> Deck: @property def timeline_deck(self) -> "TimeLineDeck": # type: ignore - from flytekit.deck.deck import TimeLineDeck + from flytekit.deck.deck import TimeLineDeck, DeckFields time_line_deck = None for deck in self.decks: @@ -282,10 +293,14 @@ def timeline_deck(self) -> "TimeLineDeck": # type: ignore time_line_deck = deck break if time_line_deck is None: - time_line_deck = TimeLineDeck("Timeline") + time_line_deck = TimeLineDeck(DeckFields.TIMELINE.value) return time_line_deck + @property + def rendered_decks(self) -> List[DeckFields]: + return self._rendered_decks + def __getattr__(self, attr_name: str) -> typing.Any: """ This houses certain task specific context. For example in Spark, it houses the SparkSession, etc diff --git a/flytekit/core/task.py b/flytekit/core/task.py index 506dca99fb..16331b7eac 100644 --- a/flytekit/core/task.py +++ b/flytekit/core/task.py @@ -114,7 +114,7 @@ def task( docs: Optional[Documentation] = ..., disable_deck: Optional[bool] = ..., enable_deck: Optional[bool] = ..., - additional_decks: Optional[List[str]] = ..., + deck_selector: Optional[List[str]] = ..., pod_template: Optional["PodTemplate"] = ..., pod_template_name: Optional[str] = ..., accelerator: Optional[BaseAccelerator] = ..., @@ -153,7 +153,7 @@ def task( docs: Optional[Documentation] = ..., disable_deck: Optional[bool] = ..., enable_deck: Optional[bool] = ..., - additional_decks: Optional[List[str]] = ..., + deck_selector: Optional[List[str]] = ..., pod_template: Optional["PodTemplate"] = ..., pod_template_name: Optional[str] = ..., accelerator: Optional[BaseAccelerator] = ..., @@ -191,7 +191,7 @@ def task( docs: Optional[Documentation] = None, disable_deck: Optional[bool] = None, enable_deck: Optional[bool] = None, - additional_decks: Optional[List[str]] = None, + deck_selector: Optional[List[str]] = None, pod_template: Optional["PodTemplate"] = None, pod_template_name: Optional[str] = None, accelerator: Optional[BaseAccelerator] = None, @@ -345,7 +345,7 @@ def wrapper(fn: Callable[..., Any]) -> PythonFunctionTask[T]: task_resolver=task_resolver, disable_deck=disable_deck, enable_deck=enable_deck, - additional_decks=additional_decks, + deck_selector=deck_selector, docs=docs, pod_template=pod_template, pod_template_name=pod_template_name, diff --git a/flytekit/deck/deck.py b/flytekit/deck/deck.py index e6ba20c8e3..f58ebd2852 100644 --- a/flytekit/deck/deck.py +++ b/flytekit/deck/deck.py @@ -18,6 +18,8 @@ class DeckFields(str, enum.Enum): INPUT = "Input" OUTPUT = "Output" + SOURCE_CODE = "Source Code" + TIMELINE = "Timeline" class Deck: @@ -138,7 +140,13 @@ def _get_deck( Get flyte deck html string If ignore_jupyter is set to True, then it will return a str even in a jupyter environment. """ - deck_map = {deck.name: deck.html for deck in new_user_params.decks} + deck_members = set([_field.value for _field in DeckFields]) + rendered_decks = new_user_params.rendered_decks + deck_map = {deck.name: deck.html for deck in new_user_params.decks + if deck.name in rendered_decks or deck.name not in deck_members} + + import pdb + # pdb.set_trace() raw_html = get_deck_template().render(metadata=deck_map) if not ignore_jupyter and ipython_check(): try: diff --git a/tests/flytekit/unit/deck/test_deck.py b/tests/flytekit/unit/deck/test_deck.py index 652266fa4e..bdd9ef704b 100644 --- a/tests/flytekit/unit/deck/test_deck.py +++ b/tests/flytekit/unit/deck/test_deck.py @@ -8,7 +8,7 @@ import flytekit from flytekit import Deck, FlyteContextManager, task from flytekit.deck import MarkdownRenderer, SourceCodeRenderer, TopFrameRenderer -from flytekit.deck.deck import _output_deck +from flytekit.deck.deck import DeckFields, _output_deck from flytekit.deck.renderer import PythonDependencyRenderer @@ -75,7 +75,7 @@ def t1(a: int) -> str: "additional_decks,expected_decks", [ ([], 2), # time line deck + source code deck - (["Input"], 3), # time line deck + source code deck + input deck + ([DeckFields.INPUT], 3), # time line deck + source code deck + input deck (["Output", "Input"], 4), # time line deck + source code deck + input and output decks (None, 2), # time line deck + source code deck ], @@ -98,8 +98,8 @@ def t1(a: int) -> str: "additional_decks,enable_deck,disable_deck", [ ([], True, None), - (["Input"], None, False), - (["Output", "Input"], False, None), + ([DeckFields.INPUT], None, False), + ([DeckFields.INPUT, DeckFields.OUTPUT], False, None), (None, True, False), (["WrongDeck", "Input", "Output"], None, None), # WrongDeck is not a valid field ], From d9297950df08bd1da0d49bb941deea7f41b0cf7f Mon Sep 17 00:00:00 2001 From: novahow Date: Thu, 4 Apr 2024 17:31:51 +0800 Subject: [PATCH 04/15] make deck_selector to tuple Signed-off-by: novahow --- flytekit/core/base_task.py | 42 ++++++++++----------- flytekit/core/context_manager.py | 11 +++--- flytekit/core/task.py | 10 ++--- flytekit/deck/deck.py | 17 +++++---- tests/flytekit/unit/deck/test_deck.py | 54 ++++++++++++++++----------- 5 files changed, 73 insertions(+), 61 deletions(-) diff --git a/flytekit/core/base_task.py b/flytekit/core/base_task.py index f072285b89..0ed4456867 100644 --- a/flytekit/core/base_task.py +++ b/flytekit/core/base_task.py @@ -463,7 +463,7 @@ def __init__( environment: Optional[Dict[str, str]] = None, disable_deck: Optional[bool] = None, enable_deck: Optional[bool] = None, - deck_selector: Optional[List[DeckFields]] = None, + decks: Optional[Tuple[str, ...]] = None, **kwargs, ): """ @@ -479,7 +479,7 @@ def __init__( execution of the task. Supplied as a dictionary of key/value pairs disable_deck (bool): (deprecated) If true, this task will not output deck html file enable_deck (bool): If true, this task will output deck html file - deck_selector (Optional[List[DeckFields]]): List of decks to be + decks (Tuple[str]): Tuple of decks to be generated for this task. Valid values can be selected from fields of ``flytekit.deck.deck.DeckFields`` enum """ super().__init__( @@ -491,36 +491,34 @@ def __init__( self._python_interface = interface if interface else Interface() self._environment = environment if environment else {} self._task_config = task_config - self._deck_selector = deck_selector if deck_selector is not None else [] + self._decks = list(decks) if (decks is not None and enable_deck is True) else [] - deck_members = set([_field for _field in DeckFields]) - full_deck = enable_deck is True or disable_deck is False - if full_deck: - self._deck_selector = list(deck_members) + deck_members = set([_field.value for _field in DeckFields]) # enumerate additional decks, check if any of them are invalid - for deck in self._deck_selector: + for deck in self._decks: if deck not in deck_members: - raise ValueError(f"Deck field {deck} is not a valid deck field. Please use one of {deck_members}") + raise ValueError( + f"Element {deck} from decks param is not a valid deck field. Please use one of {deck_members}" + ) - # first we resolve the conflict between params regarding decks, if any two of [disable_deck, enable_deck, deck_selector] + # first we resolve the conflict between params regarding decks, if any two of [disable_deck, enable_deck] # are set, we raise an error - configured_deck_params = [disable_deck is not None, enable_deck is not None, deck_selector is not None] + configured_deck_params = [disable_deck is not None, enable_deck is not None] if sum(configured_deck_params) > 1: - raise ValueError("only one of [disable_deck, enable_deck and deck_selector] can be set") + raise ValueError("only one of [disable_deck, enable_deck] can be set") if disable_deck is not None: warnings.warn( - "disable_deck was deprecated in 1.10.0, please use enable_deck or deck_selector instead", + "disable_deck was deprecated in 1.10.0, please use enable_deck and decks instead", FutureWarning, ) - decks_triggered: bool = enable_deck is True or deck_selector is not None if enable_deck is not None: self._disable_deck = not enable_deck elif disable_deck is not None: self._disable_deck = disable_deck else: - self._disable_deck = not decks_triggered + self._disable_deck = True if self._python_interface.docstring: if self.docs is None: @@ -666,12 +664,12 @@ def _write_decks(self, native_inputs, native_outputs_as_map, ctx, new_user_param INPUT = DeckFields.INPUT OUTPUT = DeckFields.OUTPUT - if DeckFields.INPUT in self.deck_selector: + if DeckFields.INPUT in self.decks: input_deck = Deck(INPUT.value) for k, v in native_inputs.items(): input_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_input_var(k, v))) - - if DeckFields.OUTPUT in self.deck_selector: + + if DeckFields.OUTPUT in self.decks: output_deck = Deck(OUTPUT.value) for k, v in native_outputs_as_map.items(): output_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_output_var(k, v))) @@ -775,7 +773,9 @@ def pre_execute(self, user_params: Optional[ExecutionParameters]) -> Optional[Ex This should return either the same context of the mutated context """ - return user_params.with_rendered_decks(self._deck_selector).build() + if user_params is None: + return user_params + return user_params.with_rendered_decks(self.decks).build() @abstractmethod def execute(self, **kwargs) -> Any: @@ -810,11 +810,11 @@ def disable_deck(self) -> bool: return self._disable_deck @property - def deck_selector(self) -> List[DeckFields]: + def decks(self) -> List[str]: """ If not empty, this task will output deck html file for the specified decks """ - return self._deck_selector + return self._decks class TaskResolverMixin(object): diff --git a/flytekit/core/context_manager.py b/flytekit/core/context_manager.py index 0adaaafabb..de305f02a3 100644 --- a/flytekit/core/context_manager.py +++ b/flytekit/core/context_manager.py @@ -39,7 +39,6 @@ if typing.TYPE_CHECKING: from flytekit import Deck - from flytekit.deck.deck import DeckFields from flytekit.clients import friendly as friendly_client # noqa # TODO: resolve circular import from flytekit.core.python_auto_container import TaskResolverMixin @@ -90,7 +89,7 @@ class Builder(object): execution_date: typing.Optional[datetime] = None logging: Optional[_logging.Logger] = None task_id: typing.Optional[_identifier.Identifier] = None - rendered_decks: Optional[List[DeckFields]] = None + rendered_decks: Optional[List[str]] = None def __init__(self, current: typing.Optional[ExecutionParameters] = None): self.stats = current.stats if current else None @@ -143,8 +142,8 @@ def with_task_sandbox(self) -> Builder: b.checkpoint = cp b.working_dir = task_sandbox_dir return b - - def with_rendered_decks(self, rendered_decks: List[DeckFields]) -> Builder: + + def with_rendered_decks(self, rendered_decks: List[str]) -> Builder: b = self.new_builder(self) b.rendered_decks = rendered_decks return b @@ -285,7 +284,7 @@ def default_deck(self) -> Deck: @property def timeline_deck(self) -> "TimeLineDeck": # type: ignore - from flytekit.deck.deck import TimeLineDeck, DeckFields + from flytekit.deck.deck import DeckFields, TimeLineDeck time_line_deck = None for deck in self.decks: @@ -298,7 +297,7 @@ def timeline_deck(self) -> "TimeLineDeck": # type: ignore return time_line_deck @property - def rendered_decks(self) -> List[DeckFields]: + def rendered_decks(self) -> List[str]: return self._rendered_decks def __getattr__(self, attr_name: str) -> typing.Any: diff --git a/flytekit/core/task.py b/flytekit/core/task.py index 16331b7eac..8b9282339d 100644 --- a/flytekit/core/task.py +++ b/flytekit/core/task.py @@ -114,7 +114,7 @@ def task( docs: Optional[Documentation] = ..., disable_deck: Optional[bool] = ..., enable_deck: Optional[bool] = ..., - deck_selector: Optional[List[str]] = ..., + decks: Optional[Tuple[str, ...]] = ..., pod_template: Optional["PodTemplate"] = ..., pod_template_name: Optional[str] = ..., accelerator: Optional[BaseAccelerator] = ..., @@ -153,7 +153,7 @@ def task( docs: Optional[Documentation] = ..., disable_deck: Optional[bool] = ..., enable_deck: Optional[bool] = ..., - deck_selector: Optional[List[str]] = ..., + decks: Optional[Tuple[str, ...]] = ..., pod_template: Optional["PodTemplate"] = ..., pod_template_name: Optional[str] = ..., accelerator: Optional[BaseAccelerator] = ..., @@ -191,7 +191,7 @@ def task( docs: Optional[Documentation] = None, disable_deck: Optional[bool] = None, enable_deck: Optional[bool] = None, - deck_selector: Optional[List[str]] = None, + decks: Optional[Tuple[str, ...]] = ("source_code", "dependencies"), pod_template: Optional["PodTemplate"] = None, pod_template_name: Optional[str] = None, accelerator: Optional[BaseAccelerator] = None, @@ -312,7 +312,7 @@ def launch_dynamically(): :param task_resolver: Provide a custom task resolver. :param disable_deck: (deprecated) If true, this task will not output deck html file :param enable_deck: If true, this task will output deck html file - :param additional_decks: If specified, this task will output deck html file with the additional fields specified in the list + :param decks: If specified and enble_deck is True, this task will output deck html file with the fields specified in the list :param docs: Documentation about this task :param pod_template: Custom PodTemplate for this task. :param pod_template_name: The name of the existing PodTemplate resource which will be used in this task. @@ -345,7 +345,7 @@ def wrapper(fn: Callable[..., Any]) -> PythonFunctionTask[T]: task_resolver=task_resolver, disable_deck=disable_deck, enable_deck=enable_deck, - deck_selector=deck_selector, + decks=decks, docs=docs, pod_template=pod_template, pod_template_name=pod_template_name, diff --git a/flytekit/deck/deck.py b/flytekit/deck/deck.py index f58ebd2852..bc89f91603 100644 --- a/flytekit/deck/deck.py +++ b/flytekit/deck/deck.py @@ -16,10 +16,11 @@ class DeckFields(str, enum.Enum): DeckFields is used to specify the fields that will be rendered in the deck. """ - INPUT = "Input" - OUTPUT = "Output" - SOURCE_CODE = "Source Code" + INPUT = "input" + OUTPUT = "output" + SOURCE_CODE = "source_code" TIMELINE = "Timeline" + DEPENDENCIES = "dependencies" class Deck: @@ -142,10 +143,12 @@ def _get_deck( """ deck_members = set([_field.value for _field in DeckFields]) rendered_decks = new_user_params.rendered_decks - deck_map = {deck.name: deck.html for deck in new_user_params.decks - if deck.name in rendered_decks or deck.name not in deck_members} - - import pdb + deck_map = { + deck.name: deck.html + for deck in new_user_params.decks + if deck.name in rendered_decks or deck.name not in deck_members + } + # pdb.set_trace() raw_html = get_deck_template().render(metadata=deck_map) if not ignore_jupyter and ipython_check(): diff --git a/tests/flytekit/unit/deck/test_deck.py b/tests/flytekit/unit/deck/test_deck.py index bdd9ef704b..19e37eb9a4 100644 --- a/tests/flytekit/unit/deck/test_deck.py +++ b/tests/flytekit/unit/deck/test_deck.py @@ -52,7 +52,7 @@ def test_timeline_deck(): "disable_deck,expected_decks", [ (None, 1), # time line deck - (False, 5), # time line deck + source code deck + python dependency deck + input and output decks + (False, 3), # time line deck + source code deck + python dependency deck (True, 1), # time line deck ], ) @@ -72,19 +72,27 @@ def t1(a: int) -> str: @pytest.mark.parametrize( - "additional_decks,expected_decks", + "decks,enable_deck,expected_decks", [ - ([], 2), # time line deck + source code deck - ([DeckFields.INPUT], 3), # time line deck + source code deck + input deck - (["Output", "Input"], 4), # time line deck + source code deck + input and output decks - (None, 2), # time line deck + source code deck + ((), True, 3), # time line deck + source code deck + python dependency deck + ((DeckFields.INPUT.value), False, 1), # time line deck + ( + (DeckFields.OUTPUT.value, DeckFields.INPUT.value, DeckFields.TIMELINE.value, DeckFields.DEPENDENCIES.value), + True, + 5, # time line deck + source code deck + dependency + input and output decks + ), + (None, None, 1), # time line deck + source code deck ], ) -def test_additional_deck_for_task(additional_decks, expected_decks): +@mock.patch("flytekit.deck.deck._output_deck") +def test_additional_deck_for_task(_output_deck, decks, enable_deck, expected_decks): ctx = FlyteContextManager.current_context() kwargs = {} - kwargs["additional_decks"] = additional_decks + if decks is not None: + kwargs["decks"] = decks + if enable_deck is not None: + kwargs["enable_deck"] = enable_deck @task(**kwargs) def t1(a: int) -> str: @@ -92,23 +100,25 @@ def t1(a: int) -> str: t1(a=3) assert len(ctx.user_space_params.decks) == expected_decks + if enable_deck is True: + assert len(_output_deck.call_args[0][1].rendered_decks) == len(decks) @pytest.mark.parametrize( - "additional_decks,enable_deck,disable_deck", + "decks,enable_deck,disable_deck", [ - ([], True, None), - ([DeckFields.INPUT], None, False), - ([DeckFields.INPUT, DeckFields.OUTPUT], False, None), (None, True, False), - (["WrongDeck", "Input", "Output"], None, None), # WrongDeck is not a valid field + (("WrongDeck", DeckFields.INPUT.value, DeckFields.OUTPUT.value), True, None), # WrongDeck is not a valid field ], ) -def test_invalid_deck_params(additional_decks, enable_deck, disable_deck): +def test_invalid_deck_params(decks, enable_deck, disable_deck): kwargs = {} - kwargs["additional_decks"] = additional_decks - kwargs["enable_deck"] = enable_deck - kwargs["disable_deck"] = disable_deck + if decks is not None: + kwargs["decks"] = decks + if enable_deck is not None: + kwargs["enable_deck"] = enable_deck + if disable_deck is not None: + kwargs["disable_deck"] = disable_deck with pytest.raises(ValueError): @@ -126,16 +136,16 @@ def t1(a: int) -> str: ( None, False, - 6, + 4, False, - ), # default deck and time line deck + source code deck + python dependency deck + input and output decks + ), # default deck and time line deck + source code deck + python dependency deck (None, True, 2, False), # default deck and time line deck ( True, None, - 6, + 4, False, - ), # default deck and time line deck + source code deck + python dependency deck + input and output decks + ), # default deck and time line deck + source code deck + python dependency deck (False, None, 2, False), # default deck and time line deck (True, True, -1, True), # Set both disable_deck and enable_deck to True and confirm that it fails (False, False, -1, True), # Set both disable_deck and enable_deck to False and confirm that it fails @@ -175,7 +185,7 @@ def t_df(a: str) -> int: def test_deck_deprecation_warning_disable_deck(): - warn_msg = "disable_deck was deprecated in 1.10.0, please use enable_deck or additional_decks instead" + warn_msg = "disable_deck was deprecated in 1.10.0, please use enable_deck and decks instead" with pytest.warns(FutureWarning, match=warn_msg): @task(disable_deck=False) From d504753cd74ff1b1ad5f399d5e5c3573dfbdad0d Mon Sep 17 00:00:00 2001 From: novahow Date: Fri, 5 Apr 2024 01:48:39 +0800 Subject: [PATCH 05/15] fix remote deck bug Signed-off-by: novahow --- flytekit/bin/entrypoint.py | 2 +- flytekit/core/base_task.py | 5 +++-- flytekit/core/context_manager.py | 3 +++ flytekit/deck/deck.py | 1 - 4 files changed, 7 insertions(+), 4 deletions(-) diff --git a/flytekit/bin/entrypoint.py b/flytekit/bin/entrypoint.py index 92f56409ec..b7dc48a624 100644 --- a/flytekit/bin/entrypoint.py +++ b/flytekit/bin/entrypoint.py @@ -174,7 +174,7 @@ def _dispatch_execute( logger.info(f"Engine folder written successfully to the output prefix {output_prefix}") if not getattr(task_def, "disable_deck", True): - _output_deck(task_def.name.split(".")[-1], ctx.user_space_params) + _output_deck(task_def.name.split(".")[-1], ctx.user_space_params.with_rendered_decks(task_def.decks).build()) logger.debug("Finished _dispatch_execute") diff --git a/flytekit/core/base_task.py b/flytekit/core/base_task.py index 0ed4456867..3b98727a59 100644 --- a/flytekit/core/base_task.py +++ b/flytekit/core/base_task.py @@ -491,7 +491,7 @@ def __init__( self._python_interface = interface if interface else Interface() self._environment = environment if environment else {} self._task_config = task_config - self._decks = list(decks) if (decks is not None and enable_deck is True) else [] + self._decks = list(decks) if (decks is not None and (enable_deck is True or disable_deck is False)) else [] deck_members = set([_field.value for _field in DeckFields]) # enumerate additional decks, check if any of them are invalid @@ -775,7 +775,8 @@ def pre_execute(self, user_params: Optional[ExecutionParameters]) -> Optional[Ex """ if user_params is None: return user_params - return user_params.with_rendered_decks(self.decks).build() + new_param = user_params.with_rendered_decks(self.decks).build() + return new_param @abstractmethod def execute(self, **kwargs) -> Any: diff --git a/flytekit/core/context_manager.py b/flytekit/core/context_manager.py index de305f02a3..96d2117c73 100644 --- a/flytekit/core/context_manager.py +++ b/flytekit/core/context_manager.py @@ -90,6 +90,7 @@ class Builder(object): logging: Optional[_logging.Logger] = None task_id: typing.Optional[_identifier.Identifier] = None rendered_decks: Optional[List[str]] = None + output_metadata_prefix: Optional[str] = None def __init__(self, current: typing.Optional[ExecutionParameters] = None): self.stats = current.stats if current else None @@ -103,6 +104,7 @@ def __init__(self, current: typing.Optional[ExecutionParameters] = None): self.raw_output_prefix = current.raw_output_prefix if current else None self.task_id = current.task_id if current else None self.rendered_decks = current.rendered_decks if current else [] + self.output_metadata_prefix = current.output_metadata_prefix if current else None def add_attr(self, key: str, v: typing.Any) -> ExecutionParameters.Builder: self.attrs[key] = v @@ -122,6 +124,7 @@ def build(self) -> ExecutionParameters: raw_output_prefix=self.raw_output_prefix, task_id=self.task_id, rendered_decks=self.rendered_decks, + output_metadata_prefix=self.output_metadata_prefix, **self.attrs, ) diff --git a/flytekit/deck/deck.py b/flytekit/deck/deck.py index bc89f91603..172f2abd25 100644 --- a/flytekit/deck/deck.py +++ b/flytekit/deck/deck.py @@ -149,7 +149,6 @@ def _get_deck( if deck.name in rendered_decks or deck.name not in deck_members } - # pdb.set_trace() raw_html = get_deck_template().render(metadata=deck_map) if not ignore_jupyter and ipython_check(): try: From 09515f629fe1734e6edac1e9bb4f7d561b6a8ed7 Mon Sep 17 00:00:00 2001 From: novahow Date: Mon, 8 Apr 2024 17:42:58 +0800 Subject: [PATCH 06/15] fix timelinedeck and remove rendered_deck param Signed-off-by: novahow --- flytekit/bin/entrypoint.py | 2 +- flytekit/core/base_task.py | 21 ++++++-------- flytekit/core/context_manager.py | 21 ++++---------- flytekit/core/python_function_task.py | 8 ++++-- flytekit/deck/deck.py | 17 ++++------- tests/flytekit/unit/core/test_flyte_file.py | 10 +++---- tests/flytekit/unit/core/test_utils.py | 3 ++ tests/flytekit/unit/deck/test_deck.py | 32 ++++++++++----------- 8 files changed, 51 insertions(+), 63 deletions(-) diff --git a/flytekit/bin/entrypoint.py b/flytekit/bin/entrypoint.py index b7dc48a624..92f56409ec 100644 --- a/flytekit/bin/entrypoint.py +++ b/flytekit/bin/entrypoint.py @@ -174,7 +174,7 @@ def _dispatch_execute( logger.info(f"Engine folder written successfully to the output prefix {output_prefix}") if not getattr(task_def, "disable_deck", True): - _output_deck(task_def.name.split(".")[-1], ctx.user_space_params.with_rendered_decks(task_def.decks).build()) + _output_deck(task_def.name.split(".")[-1], ctx.user_space_params) logger.debug("Finished _dispatch_execute") diff --git a/flytekit/core/base_task.py b/flytekit/core/base_task.py index 3b98727a59..3069b9371c 100644 --- a/flytekit/core/base_task.py +++ b/flytekit/core/base_task.py @@ -664,15 +664,13 @@ def _write_decks(self, native_inputs, native_outputs_as_map, ctx, new_user_param INPUT = DeckFields.INPUT OUTPUT = DeckFields.OUTPUT - if DeckFields.INPUT in self.decks: - input_deck = Deck(INPUT.value) - for k, v in native_inputs.items(): - input_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_input_var(k, v))) + input_deck = Deck(INPUT.value, auto_add_to_deck=DeckFields.INPUT in self.decks) + for k, v in native_inputs.items(): + input_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_input_var(k, v))) - if DeckFields.OUTPUT in self.decks: - output_deck = Deck(OUTPUT.value) - for k, v in native_outputs_as_map.items(): - output_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_output_var(k, v))) + output_deck = Deck(OUTPUT.value, auto_add_to_deck=DeckFields.OUTPUT in self.decks) + for k, v in native_outputs_as_map.items(): + output_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_output_var(k, v))) if ctx.execution_state and ctx.execution_state.is_local_execution(): # When we run the workflow remotely, flytekit outputs decks at the end of _dispatch_execute @@ -697,6 +695,8 @@ def dispatch_execute( may be none * ``DynamicJobSpec`` is returned when a dynamic workflow is executed """ + if DeckFields.TIMELINE.value in self.decks and ctx.user_space_params is not None: + ctx.user_space_params.decks.append(ctx.user_space_params.timeline_deck) # Invoked before the task is executed new_user_params = self.pre_execute(ctx.user_space_params) @@ -773,10 +773,7 @@ def pre_execute(self, user_params: Optional[ExecutionParameters]) -> Optional[Ex This should return either the same context of the mutated context """ - if user_params is None: - return user_params - new_param = user_params.with_rendered_decks(self.decks).build() - return new_param + return user_params @abstractmethod def execute(self, **kwargs) -> Any: diff --git a/flytekit/core/context_manager.py b/flytekit/core/context_manager.py index 96d2117c73..0802218a2b 100644 --- a/flytekit/core/context_manager.py +++ b/flytekit/core/context_manager.py @@ -89,7 +89,6 @@ class Builder(object): execution_date: typing.Optional[datetime] = None logging: Optional[_logging.Logger] = None task_id: typing.Optional[_identifier.Identifier] = None - rendered_decks: Optional[List[str]] = None output_metadata_prefix: Optional[str] = None def __init__(self, current: typing.Optional[ExecutionParameters] = None): @@ -103,7 +102,6 @@ def __init__(self, current: typing.Optional[ExecutionParameters] = None): self.attrs = current._attrs if current else {} self.raw_output_prefix = current.raw_output_prefix if current else None self.task_id = current.task_id if current else None - self.rendered_decks = current.rendered_decks if current else [] self.output_metadata_prefix = current.output_metadata_prefix if current else None def add_attr(self, key: str, v: typing.Any) -> ExecutionParameters.Builder: @@ -123,7 +121,6 @@ def build(self) -> ExecutionParameters: decks=self.decks, raw_output_prefix=self.raw_output_prefix, task_id=self.task_id, - rendered_decks=self.rendered_decks, output_metadata_prefix=self.output_metadata_prefix, **self.attrs, ) @@ -146,11 +143,6 @@ def with_task_sandbox(self) -> Builder: b.working_dir = task_sandbox_dir return b - def with_rendered_decks(self, rendered_decks: List[str]) -> Builder: - b = self.new_builder(self) - b.rendered_decks = rendered_decks - return b - def builder(self) -> Builder: return ExecutionParameters.Builder(current=self) @@ -166,7 +158,6 @@ def __init__( checkpoint=None, decks=None, task_id: typing.Optional[_identifier.Identifier] = None, - rendered_decks=None, **kwargs, ): """ @@ -194,7 +185,7 @@ def __init__( self._checkpoint = checkpoint self._decks = decks self._task_id = task_id - self._rendered_decks = [] if rendered_decks is None else rendered_decks + self._timeline_deck = None @property def stats(self) -> taggable.TaggableStats: @@ -295,14 +286,14 @@ def timeline_deck(self) -> "TimeLineDeck": # type: ignore time_line_deck = deck break if time_line_deck is None: - time_line_deck = TimeLineDeck(DeckFields.TIMELINE.value) + if self._timeline_deck is not None: + time_line_deck = self._timeline_deck + else: + time_line_deck = TimeLineDeck(DeckFields.TIMELINE.value) + self._timeline_deck = time_line_deck return time_line_deck - @property - def rendered_decks(self) -> List[str]: - return self._rendered_decks - def __getattr__(self, attr_name: str) -> typing.Any: """ This houses certain task specific context. For example in Spark, it houses the SparkSession, etc diff --git a/flytekit/core/python_function_task.py b/flytekit/core/python_function_task.py index c009786895..da63887502 100644 --- a/flytekit/core/python_function_task.py +++ b/flytekit/core/python_function_task.py @@ -359,11 +359,15 @@ def _write_decks(self, native_inputs, native_outputs_as_map, ctx, new_user_param source_code = inspect.getsource(self._task_function) from flytekit.deck.renderer import SourceCodeRenderer - source_code_deck = Deck(DeckFields.SOURCE_CODE.value) + source_code_deck = Deck( + DeckFields.SOURCE_CODE.value, auto_add_to_deck=DeckFields.SOURCE_CODE in self.decks + ) renderer = SourceCodeRenderer() source_code_deck.append(renderer.to_html(source_code)) - python_dependencies_deck = Deck(DeckFields.DEPENDENCIES.value) + python_dependencies_deck = Deck( + DeckFields.DEPENDENCIES.value, auto_add_to_deck=DeckFields.DEPENDENCIES in self.decks + ) renderer = PythonDependencyRenderer() python_dependencies_deck.append(renderer.to_html()) diff --git a/flytekit/deck/deck.py b/flytekit/deck/deck.py index 172f2abd25..2469596d55 100644 --- a/flytekit/deck/deck.py +++ b/flytekit/deck/deck.py @@ -65,10 +65,11 @@ def t2() -> Annotated[pd.DataFrame, TopFrameRenderer(10)]: return iris_df """ - def __init__(self, name: str, html: Optional[str] = ""): + def __init__(self, name: str, html: Optional[str] = "", auto_add_to_deck: bool = True): self._name = name self._html = html - FlyteContextManager.current_context().user_space_params.decks.append(self) + if auto_add_to_deck: + FlyteContextManager.current_context().user_space_params.decks.append(self) def append(self, html: str) -> "Deck": assert isinstance(html, str) @@ -92,8 +93,8 @@ class TimeLineDeck(Deck): Instead, the complete data set is used to create a comprehensive visualization of the execution time of each part of the task. """ - def __init__(self, name: str, html: Optional[str] = ""): - super().__init__(name, html) + def __init__(self, name: str, html: Optional[str] = "", auto_add_to_deck: bool = False): + super().__init__(name, html, auto_add_to_deck) self.time_info = [] def append_time_info(self, info: dict): @@ -141,13 +142,7 @@ def _get_deck( Get flyte deck html string If ignore_jupyter is set to True, then it will return a str even in a jupyter environment. """ - deck_members = set([_field.value for _field in DeckFields]) - rendered_decks = new_user_params.rendered_decks - deck_map = { - deck.name: deck.html - for deck in new_user_params.decks - if deck.name in rendered_decks or deck.name not in deck_members - } + deck_map = {deck.name: deck.html for deck in new_user_params.decks} raw_html = get_deck_template().render(metadata=deck_map) if not ignore_jupyter and ipython_check(): diff --git a/tests/flytekit/unit/core/test_flyte_file.py b/tests/flytekit/unit/core/test_flyte_file.py index 108f8b394d..6e055ca399 100644 --- a/tests/flytekit/unit/core/test_flyte_file.py +++ b/tests/flytekit/unit/core/test_flyte_file.py @@ -261,7 +261,7 @@ def my_wf() -> FlyteFile: def test_file_handling_remote_file_handling(): SAMPLE_DATA = "https://raw.githubusercontent.com/jbrownlee/Datasets/master/pima-indians-diabetes.data.csv" - @task(enable_deck=False) + @task def t1() -> FlyteFile: return SAMPLE_DATA @@ -312,7 +312,7 @@ def my_wf() -> FlyteFile: def test_file_handling_remote_file_handling_flyte_file(): SAMPLE_DATA = "https://raw.githubusercontent.com/jbrownlee/Datasets/master/pima-indians-diabetes.data.csv" - @task(enable_deck=False) + @task def t1() -> FlyteFile: # Unlike the test above, this returns the remote path wrapped in a FlyteFile object return FlyteFile(SAMPLE_DATA) @@ -606,7 +606,7 @@ def test_for_downloading(): @pytest.mark.sandbox_test def test_file_open_things(): - @task(enable_deck=False) + @task def write_this_file_to_s3() -> FlyteFile: ctx = FlyteContextManager.current_context() r = ctx.file_access.get_random_string() @@ -614,7 +614,7 @@ def write_this_file_to_s3() -> FlyteFile: ctx.file_access.put(__file__, dest) return FlyteFile(path=dest) - @task(enable_deck=False) + @task def copy_file(ff: FlyteFile) -> FlyteFile: new_file = FlyteFile.new_remote_file(ff.remote_path) with ff.open("r") as r: @@ -622,7 +622,7 @@ def copy_file(ff: FlyteFile) -> FlyteFile: w.write(r.read()) return new_file - @task(enable_deck=False) + @task def print_file(ff: FlyteFile): with open(ff, "r") as fh: print(len(fh.readlines())) diff --git a/tests/flytekit/unit/core/test_utils.py b/tests/flytekit/unit/core/test_utils.py index ca0d07565d..4880369097 100644 --- a/tests/flytekit/unit/core/test_utils.py +++ b/tests/flytekit/unit/core/test_utils.py @@ -33,9 +33,12 @@ def test_timeit(): ctx = FlyteContextManager.current_context() ctx.user_space_params._decks = [] + from flytekit.deck.deck import DeckFields + with timeit("Set disable_deck to False"): kwargs = {} kwargs["disable_deck"] = False + kwargs["decks"] = (DeckFields.TIMELINE.value,) ctx = FlyteContextManager.current_context() time_info_list = ctx.user_space_params.timeline_deck.time_info diff --git a/tests/flytekit/unit/deck/test_deck.py b/tests/flytekit/unit/deck/test_deck.py index 19e37eb9a4..aa966bb785 100644 --- a/tests/flytekit/unit/deck/test_deck.py +++ b/tests/flytekit/unit/deck/test_deck.py @@ -45,15 +45,15 @@ def test_timeline_deck(): assert timeline_deck.name == "Timeline" assert len(timeline_deck.time_info) == 1 assert timeline_deck.time_info[0] == time_info - assert len(ctx.user_space_params.decks) == 1 + assert len(ctx.user_space_params.decks) == 0 @pytest.mark.parametrize( "disable_deck,expected_decks", [ - (None, 1), # time line deck - (False, 3), # time line deck + source code deck + python dependency deck - (True, 1), # time line deck + (None, 0), + (False, 2), # source code deck + python dependency deck + (True, 0), ], ) def test_deck_for_task(disable_deck, expected_decks): @@ -74,14 +74,14 @@ def t1(a: int) -> str: @pytest.mark.parametrize( "decks,enable_deck,expected_decks", [ - ((), True, 3), # time line deck + source code deck + python dependency deck - ((DeckFields.INPUT.value), False, 1), # time line deck + ((), True, 0), + ((DeckFields.INPUT.value), False, 0), ( (DeckFields.OUTPUT.value, DeckFields.INPUT.value, DeckFields.TIMELINE.value, DeckFields.DEPENDENCIES.value), True, - 5, # time line deck + source code deck + dependency + input and output decks + 4, # time line deck + dependency + input and output decks ), - (None, None, 1), # time line deck + source code deck + (None, True, 2), # source code deck + python dependency deck ], ) @mock.patch("flytekit.deck.deck._output_deck") @@ -100,8 +100,6 @@ def t1(a: int) -> str: t1(a=3) assert len(ctx.user_space_params.decks) == expected_decks - if enable_deck is True: - assert len(_output_deck.call_args[0][1].rendered_decks) == len(decks) @pytest.mark.parametrize( @@ -132,21 +130,21 @@ def t1(a: int) -> str: @pytest.mark.parametrize( "enable_deck,disable_deck, expected_decks, expect_error", [ - (None, None, 2, False), # default deck and time line deck + (None, None, 1, False), # default deck ( None, False, - 4, + 3, False, - ), # default deck and time line deck + source code deck + python dependency deck - (None, True, 2, False), # default deck and time line deck + ), # default deck + source code deck + python dependency deck + (None, True, 1, False), # default deck ( True, None, - 4, + 3, False, - ), # default deck and time line deck + source code deck + python dependency deck - (False, None, 2, False), # default deck and time line deck + ), # default deck + source code deck + python dependency deck + (False, None, 1, False), # default deck (True, True, -1, True), # Set both disable_deck and enable_deck to True and confirm that it fails (False, False, -1, True), # Set both disable_deck and enable_deck to False and confirm that it fails ], From a1886b3b3aa3c8a96c551a61cdcae0d4c0070bde Mon Sep 17 00:00:00 2001 From: novahow Date: Wed, 10 Apr 2024 04:16:26 +0800 Subject: [PATCH 07/15] fix UI Signed-off-by: novahow --- flytekit/core/base_task.py | 33 +++++++++++++++------------ flytekit/core/context_manager.py | 2 +- flytekit/core/python_function_task.py | 20 ++++++++-------- flytekit/core/task.py | 2 +- flytekit/deck/deck.py | 10 ++++---- 5 files changed, 34 insertions(+), 33 deletions(-) diff --git a/flytekit/core/base_task.py b/flytekit/core/base_task.py index 3069b9371c..5d363ba550 100644 --- a/flytekit/core/base_task.py +++ b/flytekit/core/base_task.py @@ -491,15 +491,6 @@ def __init__( self._python_interface = interface if interface else Interface() self._environment = environment if environment else {} self._task_config = task_config - self._decks = list(decks) if (decks is not None and (enable_deck is True or disable_deck is False)) else [] - - deck_members = set([_field.value for _field in DeckFields]) - # enumerate additional decks, check if any of them are invalid - for deck in self._decks: - if deck not in deck_members: - raise ValueError( - f"Element {deck} from decks param is not a valid deck field. Please use one of {deck_members}" - ) # first we resolve the conflict between params regarding decks, if any two of [disable_deck, enable_deck] # are set, we raise an error @@ -520,6 +511,16 @@ def __init__( else: self._disable_deck = True + self._decks = list(decks) if (decks is not None and self.disable_deck is False) else [] + + deck_members = set([_field.value for _field in DeckFields]) + # enumerate additional decks, check if any of them are invalid + for deck in self._decks: + if deck not in deck_members: + raise ValueError( + f"Element {deck} from decks param is not a valid deck field. Please use one of {deck_members}" + ) + if self._python_interface.docstring: if self.docs is None: self._docs = Documentation( @@ -664,13 +665,15 @@ def _write_decks(self, native_inputs, native_outputs_as_map, ctx, new_user_param INPUT = DeckFields.INPUT OUTPUT = DeckFields.OUTPUT - input_deck = Deck(INPUT.value, auto_add_to_deck=DeckFields.INPUT in self.decks) - for k, v in native_inputs.items(): - input_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_input_var(k, v))) + if DeckFields.INPUT in self.decks: + input_deck = Deck(INPUT.value) + for k, v in native_inputs.items(): + input_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_input_var(k, v))) - output_deck = Deck(OUTPUT.value, auto_add_to_deck=DeckFields.OUTPUT in self.decks) - for k, v in native_outputs_as_map.items(): - output_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_output_var(k, v))) + if DeckFields.OUTPUT in self.decks: + output_deck = Deck(OUTPUT.value) + for k, v in native_outputs_as_map.items(): + output_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_output_var(k, v))) if ctx.execution_state and ctx.execution_state.is_local_execution(): # When we run the workflow remotely, flytekit outputs decks at the end of _dispatch_execute diff --git a/flytekit/core/context_manager.py b/flytekit/core/context_manager.py index 0802218a2b..67da40315e 100644 --- a/flytekit/core/context_manager.py +++ b/flytekit/core/context_manager.py @@ -289,7 +289,7 @@ def timeline_deck(self) -> "TimeLineDeck": # type: ignore if self._timeline_deck is not None: time_line_deck = self._timeline_deck else: - time_line_deck = TimeLineDeck(DeckFields.TIMELINE.value) + time_line_deck = TimeLineDeck(DeckFields.TIMELINE.value, auto_add_to_deck=False) self._timeline_deck = time_line_deck return time_line_deck diff --git a/flytekit/core/python_function_task.py b/flytekit/core/python_function_task.py index da63887502..54179e8e71 100644 --- a/flytekit/core/python_function_task.py +++ b/flytekit/core/python_function_task.py @@ -359,16 +359,14 @@ def _write_decks(self, native_inputs, native_outputs_as_map, ctx, new_user_param source_code = inspect.getsource(self._task_function) from flytekit.deck.renderer import SourceCodeRenderer - source_code_deck = Deck( - DeckFields.SOURCE_CODE.value, auto_add_to_deck=DeckFields.SOURCE_CODE in self.decks - ) - renderer = SourceCodeRenderer() - source_code_deck.append(renderer.to_html(source_code)) - - python_dependencies_deck = Deck( - DeckFields.DEPENDENCIES.value, auto_add_to_deck=DeckFields.DEPENDENCIES in self.decks - ) - renderer = PythonDependencyRenderer() - python_dependencies_deck.append(renderer.to_html()) + if DeckFields.SOURCE_CODE in self.decks: + source_code_deck = Deck(DeckFields.SOURCE_CODE.value) + renderer = SourceCodeRenderer() + source_code_deck.append(renderer.to_html(source_code)) + + if DeckFields.DEPENDENCIES in self.decks: + python_dependencies_deck = Deck(DeckFields.DEPENDENCIES.value) + renderer = PythonDependencyRenderer() + python_dependencies_deck.append(renderer.to_html()) return super()._write_decks(native_inputs, native_outputs_as_map, ctx, new_user_params) diff --git a/flytekit/core/task.py b/flytekit/core/task.py index 8b9282339d..21dfb9a556 100644 --- a/flytekit/core/task.py +++ b/flytekit/core/task.py @@ -191,7 +191,7 @@ def task( docs: Optional[Documentation] = None, disable_deck: Optional[bool] = None, enable_deck: Optional[bool] = None, - decks: Optional[Tuple[str, ...]] = ("source_code", "dependencies"), + decks: Optional[Tuple[str, ...]] = ("Source Code", "Dependencies"), pod_template: Optional["PodTemplate"] = None, pod_template_name: Optional[str] = None, accelerator: Optional[BaseAccelerator] = None, diff --git a/flytekit/deck/deck.py b/flytekit/deck/deck.py index 2469596d55..9ef5123ad1 100644 --- a/flytekit/deck/deck.py +++ b/flytekit/deck/deck.py @@ -16,11 +16,11 @@ class DeckFields(str, enum.Enum): DeckFields is used to specify the fields that will be rendered in the deck. """ - INPUT = "input" - OUTPUT = "output" - SOURCE_CODE = "source_code" + INPUT = "Input" + OUTPUT = "Output" + SOURCE_CODE = "Source Code" TIMELINE = "Timeline" - DEPENDENCIES = "dependencies" + DEPENDENCIES = "Dependencies" class Deck: @@ -93,7 +93,7 @@ class TimeLineDeck(Deck): Instead, the complete data set is used to create a comprehensive visualization of the execution time of each part of the task. """ - def __init__(self, name: str, html: Optional[str] = "", auto_add_to_deck: bool = False): + def __init__(self, name: str, html: Optional[str] = "", auto_add_to_deck: bool = True): super().__init__(name, html, auto_add_to_deck) self.time_info = [] From 5be382f671f6231f79e57cbb60872042bbbf95fa Mon Sep 17 00:00:00 2001 From: novahow Date: Tue, 30 Apr 2024 22:12:48 +0800 Subject: [PATCH 08/15] fix timelinedeck test multiple time_info Signed-off-by: novahow --- flytekit/core/base_task.py | 6 +++--- flytekit/core/python_function_task.py | 3 +-- flytekit/deck/__init__.py | 2 +- tests/flytekit/unit/core/test_utils.py | 2 +- tests/flytekit/unit/deck/test_deck.py | 5 +++-- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/flytekit/core/base_task.py b/flytekit/core/base_task.py index 5d363ba550..19c83e95e1 100644 --- a/flytekit/core/base_task.py +++ b/flytekit/core/base_task.py @@ -68,7 +68,7 @@ from flytekit.core.tracker import TrackedInstance from flytekit.core.type_engine import TypeEngine, TypeTransformerFailedError from flytekit.core.utils import timeit -from flytekit.deck.deck import DeckFields +from flytekit.deck import DeckFields from flytekit.loggers import logger from flytekit.models import dynamic_job as _dynamic_job from flytekit.models import interface as _interface_models @@ -463,7 +463,7 @@ def __init__( environment: Optional[Dict[str, str]] = None, disable_deck: Optional[bool] = None, enable_deck: Optional[bool] = None, - decks: Optional[Tuple[str, ...]] = None, + decks: Optional[Tuple[str, ...]] = ("Source Code", "Dependencies"), **kwargs, ): """ @@ -480,7 +480,7 @@ def __init__( disable_deck (bool): (deprecated) If true, this task will not output deck html file enable_deck (bool): If true, this task will output deck html file decks (Tuple[str]): Tuple of decks to be - generated for this task. Valid values can be selected from fields of ``flytekit.deck.deck.DeckFields`` enum + generated for this task. Valid values can be selected from fields of ``flytekit.deck.DeckFields`` enum """ super().__init__( task_type=task_type, diff --git a/flytekit/core/python_function_task.py b/flytekit/core/python_function_task.py index 54179e8e71..3190f47807 100644 --- a/flytekit/core/python_function_task.py +++ b/flytekit/core/python_function_task.py @@ -350,8 +350,7 @@ def dynamic_execute(self, task_function: Callable, **kwargs) -> Any: def _write_decks(self, native_inputs, native_outputs_as_map, ctx, new_user_params): if self._disable_deck is False: - from flytekit.deck import Deck - from flytekit.deck.deck import DeckFields + from flytekit.deck import Deck, DeckFields from flytekit.deck.renderer import PythonDependencyRenderer # These errors are raised if the source code can not be retrieved diff --git a/flytekit/deck/__init__.py b/flytekit/deck/__init__.py index 610f92da15..3059247bd3 100644 --- a/flytekit/deck/__init__.py +++ b/flytekit/deck/__init__.py @@ -16,5 +16,5 @@ SourceCodeRenderer """ -from .deck import Deck +from .deck import Deck, DeckFields from .renderer import MarkdownRenderer, SourceCodeRenderer, TopFrameRenderer diff --git a/tests/flytekit/unit/core/test_utils.py b/tests/flytekit/unit/core/test_utils.py index 4880369097..f9e886139f 100644 --- a/tests/flytekit/unit/core/test_utils.py +++ b/tests/flytekit/unit/core/test_utils.py @@ -33,7 +33,7 @@ def test_timeit(): ctx = FlyteContextManager.current_context() ctx.user_space_params._decks = [] - from flytekit.deck.deck import DeckFields + from flytekit.deck import DeckFields with timeit("Set disable_deck to False"): kwargs = {} diff --git a/tests/flytekit/unit/deck/test_deck.py b/tests/flytekit/unit/deck/test_deck.py index aa966bb785..f045046ebf 100644 --- a/tests/flytekit/unit/deck/test_deck.py +++ b/tests/flytekit/unit/deck/test_deck.py @@ -7,8 +7,8 @@ import flytekit from flytekit import Deck, FlyteContextManager, task -from flytekit.deck import MarkdownRenderer, SourceCodeRenderer, TopFrameRenderer -from flytekit.deck.deck import DeckFields, _output_deck +from flytekit.deck import DeckFields, MarkdownRenderer, SourceCodeRenderer, TopFrameRenderer +from flytekit.deck.deck import _output_deck from flytekit.deck.renderer import PythonDependencyRenderer @@ -40,6 +40,7 @@ def test_timeline_deck(): ) ctx = FlyteContextManager.current_context() ctx.user_space_params._decks = [] + ctx.user_space_params._timeline_deck = None timeline_deck = ctx.user_space_params.timeline_deck timeline_deck.append_time_info(time_info) assert timeline_deck.name == "Timeline" From d2c9c9bceab81a347d4782cfe403636b79feec5a Mon Sep 17 00:00:00 2001 From: novahow Date: Mon, 6 May 2024 21:29:57 +0800 Subject: [PATCH 09/15] nit Signed-off-by: novahow --- flytekit/core/base_task.py | 20 ++++++++++---------- flytekit/core/context_manager.py | 4 ++-- flytekit/core/python_function_task.py | 10 +++++----- flytekit/core/task.py | 3 ++- flytekit/deck/__init__.py | 2 +- flytekit/deck/deck.py | 4 ++-- tests/flytekit/unit/core/test_utils.py | 4 ++-- tests/flytekit/unit/deck/test_deck.py | 8 ++++---- 8 files changed, 28 insertions(+), 27 deletions(-) diff --git a/flytekit/core/base_task.py b/flytekit/core/base_task.py index 19c83e95e1..74980c1cec 100644 --- a/flytekit/core/base_task.py +++ b/flytekit/core/base_task.py @@ -68,7 +68,7 @@ from flytekit.core.tracker import TrackedInstance from flytekit.core.type_engine import TypeEngine, TypeTransformerFailedError from flytekit.core.utils import timeit -from flytekit.deck import DeckFields +from flytekit.deck import DeckField from flytekit.loggers import logger from flytekit.models import dynamic_job as _dynamic_job from flytekit.models import interface as _interface_models @@ -463,7 +463,7 @@ def __init__( environment: Optional[Dict[str, str]] = None, disable_deck: Optional[bool] = None, enable_deck: Optional[bool] = None, - decks: Optional[Tuple[str, ...]] = ("Source Code", "Dependencies"), + decks: Optional[Tuple[str, ...]] = (DeckField.SOURCE_CODE.value, DeckField.DEPENDENCIES.value), **kwargs, ): """ @@ -480,7 +480,7 @@ def __init__( disable_deck (bool): (deprecated) If true, this task will not output deck html file enable_deck (bool): If true, this task will output deck html file decks (Tuple[str]): Tuple of decks to be - generated for this task. Valid values can be selected from fields of ``flytekit.deck.DeckFields`` enum + generated for this task. Valid values can be selected from fields of ``flytekit.deck.DeckField`` enum """ super().__init__( task_type=task_type, @@ -513,7 +513,7 @@ def __init__( self._decks = list(decks) if (decks is not None and self.disable_deck is False) else [] - deck_members = set([_field.value for _field in DeckFields]) + deck_members = set([_field.value for _field in DeckField]) # enumerate additional decks, check if any of them are invalid for deck in self._decks: if deck not in deck_members: @@ -660,17 +660,17 @@ def _output_to_literal_map(self, native_outputs: Dict[int, Any], ctx: FlyteConte def _write_decks(self, native_inputs, native_outputs_as_map, ctx, new_user_params): if self._disable_deck is False: - from flytekit.deck.deck import Deck, DeckFields, _output_deck + from flytekit.deck.deck import Deck, DeckField, _output_deck - INPUT = DeckFields.INPUT - OUTPUT = DeckFields.OUTPUT + INPUT = DeckField.INPUT + OUTPUT = DeckField.OUTPUT - if DeckFields.INPUT in self.decks: + if DeckField.INPUT in self.decks: input_deck = Deck(INPUT.value) for k, v in native_inputs.items(): input_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_input_var(k, v))) - if DeckFields.OUTPUT in self.decks: + if DeckField.OUTPUT in self.decks: output_deck = Deck(OUTPUT.value) for k, v in native_outputs_as_map.items(): output_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_output_var(k, v))) @@ -698,7 +698,7 @@ def dispatch_execute( may be none * ``DynamicJobSpec`` is returned when a dynamic workflow is executed """ - if DeckFields.TIMELINE.value in self.decks and ctx.user_space_params is not None: + if DeckField.TIMELINE.value in self.decks and ctx.user_space_params is not None: ctx.user_space_params.decks.append(ctx.user_space_params.timeline_deck) # Invoked before the task is executed new_user_params = self.pre_execute(ctx.user_space_params) diff --git a/flytekit/core/context_manager.py b/flytekit/core/context_manager.py index 67da40315e..0db74b6770 100644 --- a/flytekit/core/context_manager.py +++ b/flytekit/core/context_manager.py @@ -278,7 +278,7 @@ def default_deck(self) -> Deck: @property def timeline_deck(self) -> "TimeLineDeck": # type: ignore - from flytekit.deck.deck import DeckFields, TimeLineDeck + from flytekit.deck.deck import DeckField, TimeLineDeck time_line_deck = None for deck in self.decks: @@ -289,7 +289,7 @@ def timeline_deck(self) -> "TimeLineDeck": # type: ignore if self._timeline_deck is not None: time_line_deck = self._timeline_deck else: - time_line_deck = TimeLineDeck(DeckFields.TIMELINE.value, auto_add_to_deck=False) + time_line_deck = TimeLineDeck(DeckField.TIMELINE.value, auto_add_to_deck=False) self._timeline_deck = time_line_deck return time_line_deck diff --git a/flytekit/core/python_function_task.py b/flytekit/core/python_function_task.py index 3190f47807..4066e95837 100644 --- a/flytekit/core/python_function_task.py +++ b/flytekit/core/python_function_task.py @@ -350,7 +350,7 @@ def dynamic_execute(self, task_function: Callable, **kwargs) -> Any: def _write_decks(self, native_inputs, native_outputs_as_map, ctx, new_user_params): if self._disable_deck is False: - from flytekit.deck import Deck, DeckFields + from flytekit.deck import Deck, DeckField from flytekit.deck.renderer import PythonDependencyRenderer # These errors are raised if the source code can not be retrieved @@ -358,13 +358,13 @@ def _write_decks(self, native_inputs, native_outputs_as_map, ctx, new_user_param source_code = inspect.getsource(self._task_function) from flytekit.deck.renderer import SourceCodeRenderer - if DeckFields.SOURCE_CODE in self.decks: - source_code_deck = Deck(DeckFields.SOURCE_CODE.value) + if DeckField.SOURCE_CODE in self.decks: + source_code_deck = Deck(DeckField.SOURCE_CODE.value) renderer = SourceCodeRenderer() source_code_deck.append(renderer.to_html(source_code)) - if DeckFields.DEPENDENCIES in self.decks: - python_dependencies_deck = Deck(DeckFields.DEPENDENCIES.value) + if DeckField.DEPENDENCIES in self.decks: + python_dependencies_deck = Deck(DeckField.DEPENDENCIES.value) renderer = PythonDependencyRenderer() python_dependencies_deck.append(renderer.to_html()) diff --git a/flytekit/core/task.py b/flytekit/core/task.py index 21dfb9a556..c7cb64b8e7 100644 --- a/flytekit/core/task.py +++ b/flytekit/core/task.py @@ -12,6 +12,7 @@ from flytekit.core.python_function_task import PythonFunctionTask from flytekit.core.reference_entity import ReferenceEntity, TaskReference from flytekit.core.resources import Resources +from flytekit.deck import DeckField from flytekit.extras.accelerators import BaseAccelerator from flytekit.image_spec.image_spec import ImageSpec from flytekit.models.documentation import Documentation @@ -191,7 +192,7 @@ def task( docs: Optional[Documentation] = None, disable_deck: Optional[bool] = None, enable_deck: Optional[bool] = None, - decks: Optional[Tuple[str, ...]] = ("Source Code", "Dependencies"), + decks: Optional[Tuple[str, ...]] = (DeckField.SOURCE_CODE.value, DeckField.DEPENDENCIES.value), pod_template: Optional["PodTemplate"] = None, pod_template_name: Optional[str] = None, accelerator: Optional[BaseAccelerator] = None, diff --git a/flytekit/deck/__init__.py b/flytekit/deck/__init__.py index 3059247bd3..06144c39ba 100644 --- a/flytekit/deck/__init__.py +++ b/flytekit/deck/__init__.py @@ -16,5 +16,5 @@ SourceCodeRenderer """ -from .deck import Deck, DeckFields +from .deck import Deck, DeckField from .renderer import MarkdownRenderer, SourceCodeRenderer, TopFrameRenderer diff --git a/flytekit/deck/deck.py b/flytekit/deck/deck.py index 9ef5123ad1..b39c874101 100644 --- a/flytekit/deck/deck.py +++ b/flytekit/deck/deck.py @@ -11,9 +11,9 @@ DECK_FILE_NAME = "deck.html" -class DeckFields(str, enum.Enum): +class DeckField(str, enum.Enum): """ - DeckFields is used to specify the fields that will be rendered in the deck. + DeckField is used to specify the fields that will be rendered in the deck. """ INPUT = "Input" diff --git a/tests/flytekit/unit/core/test_utils.py b/tests/flytekit/unit/core/test_utils.py index f9e886139f..7afa97dff6 100644 --- a/tests/flytekit/unit/core/test_utils.py +++ b/tests/flytekit/unit/core/test_utils.py @@ -33,12 +33,12 @@ def test_timeit(): ctx = FlyteContextManager.current_context() ctx.user_space_params._decks = [] - from flytekit.deck import DeckFields + from flytekit.deck import DeckField with timeit("Set disable_deck to False"): kwargs = {} kwargs["disable_deck"] = False - kwargs["decks"] = (DeckFields.TIMELINE.value,) + kwargs["decks"] = (DeckField.TIMELINE.value,) ctx = FlyteContextManager.current_context() time_info_list = ctx.user_space_params.timeline_deck.time_info diff --git a/tests/flytekit/unit/deck/test_deck.py b/tests/flytekit/unit/deck/test_deck.py index f045046ebf..035b182901 100644 --- a/tests/flytekit/unit/deck/test_deck.py +++ b/tests/flytekit/unit/deck/test_deck.py @@ -7,7 +7,7 @@ import flytekit from flytekit import Deck, FlyteContextManager, task -from flytekit.deck import DeckFields, MarkdownRenderer, SourceCodeRenderer, TopFrameRenderer +from flytekit.deck import DeckField, MarkdownRenderer, SourceCodeRenderer, TopFrameRenderer from flytekit.deck.deck import _output_deck from flytekit.deck.renderer import PythonDependencyRenderer @@ -76,9 +76,9 @@ def t1(a: int) -> str: "decks,enable_deck,expected_decks", [ ((), True, 0), - ((DeckFields.INPUT.value), False, 0), + ((DeckField.INPUT.value), False, 0), ( - (DeckFields.OUTPUT.value, DeckFields.INPUT.value, DeckFields.TIMELINE.value, DeckFields.DEPENDENCIES.value), + (DeckField.OUTPUT.value, DeckField.INPUT.value, DeckField.TIMELINE.value, DeckField.DEPENDENCIES.value), True, 4, # time line deck + dependency + input and output decks ), @@ -107,7 +107,7 @@ def t1(a: int) -> str: "decks,enable_deck,disable_deck", [ (None, True, False), - (("WrongDeck", DeckFields.INPUT.value, DeckFields.OUTPUT.value), True, None), # WrongDeck is not a valid field + (("WrongDeck", DeckField.INPUT.value, DeckField.OUTPUT.value), True, None), # WrongDeck is not a valid field ], ) def test_invalid_deck_params(decks, enable_deck, disable_deck): From c1b73f8b9b01c49115d44d7a19f1d12fe301b5be Mon Sep 17 00:00:00 2001 From: novahow Date: Tue, 7 May 2024 18:14:39 +0800 Subject: [PATCH 10/15] nit with enum Signed-off-by: novahow --- flytekit/core/base_task.py | 8 ++++---- flytekit/core/task.py | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/flytekit/core/base_task.py b/flytekit/core/base_task.py index 74980c1cec..d1a1a6aafe 100644 --- a/flytekit/core/base_task.py +++ b/flytekit/core/base_task.py @@ -463,7 +463,7 @@ def __init__( environment: Optional[Dict[str, str]] = None, disable_deck: Optional[bool] = None, enable_deck: Optional[bool] = None, - decks: Optional[Tuple[str, ...]] = (DeckField.SOURCE_CODE.value, DeckField.DEPENDENCIES.value), + decks: Optional[Tuple[DeckField, ...]] = (DeckField.SOURCE_CODE, DeckField.DEPENDENCIES), **kwargs, ): """ @@ -479,7 +479,7 @@ def __init__( execution of the task. Supplied as a dictionary of key/value pairs disable_deck (bool): (deprecated) If true, this task will not output deck html file enable_deck (bool): If true, this task will output deck html file - decks (Tuple[str]): Tuple of decks to be + decks (Tuple[DeckField]): Tuple of decks to be generated for this task. Valid values can be selected from fields of ``flytekit.deck.DeckField`` enum """ super().__init__( @@ -513,7 +513,7 @@ def __init__( self._decks = list(decks) if (decks is not None and self.disable_deck is False) else [] - deck_members = set([_field.value for _field in DeckField]) + deck_members = set([_field for _field in DeckField]) # enumerate additional decks, check if any of them are invalid for deck in self._decks: if deck not in deck_members: @@ -811,7 +811,7 @@ def disable_deck(self) -> bool: return self._disable_deck @property - def decks(self) -> List[str]: + def decks(self) -> List[DeckField]: """ If not empty, this task will output deck html file for the specified decks """ diff --git a/flytekit/core/task.py b/flytekit/core/task.py index c7cb64b8e7..705ead31cf 100644 --- a/flytekit/core/task.py +++ b/flytekit/core/task.py @@ -115,7 +115,7 @@ def task( docs: Optional[Documentation] = ..., disable_deck: Optional[bool] = ..., enable_deck: Optional[bool] = ..., - decks: Optional[Tuple[str, ...]] = ..., + decks: Optional[Tuple[DeckField, ...]] = ..., pod_template: Optional["PodTemplate"] = ..., pod_template_name: Optional[str] = ..., accelerator: Optional[BaseAccelerator] = ..., @@ -154,7 +154,7 @@ def task( docs: Optional[Documentation] = ..., disable_deck: Optional[bool] = ..., enable_deck: Optional[bool] = ..., - decks: Optional[Tuple[str, ...]] = ..., + decks: Optional[Tuple[DeckField, ...]] = ..., pod_template: Optional["PodTemplate"] = ..., pod_template_name: Optional[str] = ..., accelerator: Optional[BaseAccelerator] = ..., @@ -192,7 +192,7 @@ def task( docs: Optional[Documentation] = None, disable_deck: Optional[bool] = None, enable_deck: Optional[bool] = None, - decks: Optional[Tuple[str, ...]] = (DeckField.SOURCE_CODE.value, DeckField.DEPENDENCIES.value), + decks: Optional[Tuple[DeckField, ...]] = (DeckField.SOURCE_CODE, DeckField.DEPENDENCIES), pod_template: Optional["PodTemplate"] = None, pod_template_name: Optional[str] = None, accelerator: Optional[BaseAccelerator] = None, From 2d4fb511919940270fb681cf56e0be9a3429c4f6 Mon Sep 17 00:00:00 2001 From: novahow Date: Tue, 28 May 2024 04:59:10 +0800 Subject: [PATCH 11/15] nit deck_fields Signed-off-by: novahow --- flytekit/core/base_task.py | 20 ++++++++++---------- flytekit/core/python_function_task.py | 4 ++-- flytekit/core/task.py | 10 +++++----- tests/flytekit/unit/core/test_utils.py | 2 +- tests/flytekit/unit/deck/test_deck.py | 16 ++++++++-------- 5 files changed, 26 insertions(+), 26 deletions(-) diff --git a/flytekit/core/base_task.py b/flytekit/core/base_task.py index d1a1a6aafe..4561834f6e 100644 --- a/flytekit/core/base_task.py +++ b/flytekit/core/base_task.py @@ -463,7 +463,7 @@ def __init__( environment: Optional[Dict[str, str]] = None, disable_deck: Optional[bool] = None, enable_deck: Optional[bool] = None, - decks: Optional[Tuple[DeckField, ...]] = (DeckField.SOURCE_CODE, DeckField.DEPENDENCIES), + deck_fields: Optional[Tuple[DeckField, ...]] = (DeckField.SOURCE_CODE, DeckField.DEPENDENCIES), **kwargs, ): """ @@ -479,7 +479,7 @@ def __init__( execution of the task. Supplied as a dictionary of key/value pairs disable_deck (bool): (deprecated) If true, this task will not output deck html file enable_deck (bool): If true, this task will output deck html file - decks (Tuple[DeckField]): Tuple of decks to be + deck_fields (Tuple[DeckField]): Tuple of decks to be generated for this task. Valid values can be selected from fields of ``flytekit.deck.DeckField`` enum """ super().__init__( @@ -511,14 +511,14 @@ def __init__( else: self._disable_deck = True - self._decks = list(decks) if (decks is not None and self.disable_deck is False) else [] + self._deck_fields = list(deck_fields) if (deck_fields is not None and self.disable_deck is False) else [] deck_members = set([_field for _field in DeckField]) # enumerate additional decks, check if any of them are invalid - for deck in self._decks: + for deck in self._deck_fields: if deck not in deck_members: raise ValueError( - f"Element {deck} from decks param is not a valid deck field. Please use one of {deck_members}" + f"Element {deck} from deck_fields param is not a valid deck field. Please use one of {deck_members}" ) if self._python_interface.docstring: @@ -665,12 +665,12 @@ def _write_decks(self, native_inputs, native_outputs_as_map, ctx, new_user_param INPUT = DeckField.INPUT OUTPUT = DeckField.OUTPUT - if DeckField.INPUT in self.decks: + if DeckField.INPUT in self.deck_fields: input_deck = Deck(INPUT.value) for k, v in native_inputs.items(): input_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_input_var(k, v))) - if DeckField.OUTPUT in self.decks: + if DeckField.OUTPUT in self.deck_fields: output_deck = Deck(OUTPUT.value) for k, v in native_outputs_as_map.items(): output_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_output_var(k, v))) @@ -698,7 +698,7 @@ def dispatch_execute( may be none * ``DynamicJobSpec`` is returned when a dynamic workflow is executed """ - if DeckField.TIMELINE.value in self.decks and ctx.user_space_params is not None: + if DeckField.TIMELINE.value in self.deck_fields and ctx.user_space_params is not None: ctx.user_space_params.decks.append(ctx.user_space_params.timeline_deck) # Invoked before the task is executed new_user_params = self.pre_execute(ctx.user_space_params) @@ -811,11 +811,11 @@ def disable_deck(self) -> bool: return self._disable_deck @property - def decks(self) -> List[DeckField]: + def deck_fields(self) -> List[DeckField]: """ If not empty, this task will output deck html file for the specified decks """ - return self._decks + return self._deck_fields class TaskResolverMixin(object): diff --git a/flytekit/core/python_function_task.py b/flytekit/core/python_function_task.py index 4066e95837..19e8883a4f 100644 --- a/flytekit/core/python_function_task.py +++ b/flytekit/core/python_function_task.py @@ -358,12 +358,12 @@ def _write_decks(self, native_inputs, native_outputs_as_map, ctx, new_user_param source_code = inspect.getsource(self._task_function) from flytekit.deck.renderer import SourceCodeRenderer - if DeckField.SOURCE_CODE in self.decks: + if DeckField.SOURCE_CODE in self.deck_fields: source_code_deck = Deck(DeckField.SOURCE_CODE.value) renderer = SourceCodeRenderer() source_code_deck.append(renderer.to_html(source_code)) - if DeckField.DEPENDENCIES in self.decks: + if DeckField.DEPENDENCIES in self.deck_fields: python_dependencies_deck = Deck(DeckField.DEPENDENCIES.value) renderer = PythonDependencyRenderer() python_dependencies_deck.append(renderer.to_html()) diff --git a/flytekit/core/task.py b/flytekit/core/task.py index 705ead31cf..4597a0612d 100644 --- a/flytekit/core/task.py +++ b/flytekit/core/task.py @@ -115,7 +115,7 @@ def task( docs: Optional[Documentation] = ..., disable_deck: Optional[bool] = ..., enable_deck: Optional[bool] = ..., - decks: Optional[Tuple[DeckField, ...]] = ..., + deck_fields: Optional[Tuple[DeckField, ...]] = ..., pod_template: Optional["PodTemplate"] = ..., pod_template_name: Optional[str] = ..., accelerator: Optional[BaseAccelerator] = ..., @@ -154,7 +154,7 @@ def task( docs: Optional[Documentation] = ..., disable_deck: Optional[bool] = ..., enable_deck: Optional[bool] = ..., - decks: Optional[Tuple[DeckField, ...]] = ..., + deck_fields: Optional[Tuple[DeckField, ...]] = ..., pod_template: Optional["PodTemplate"] = ..., pod_template_name: Optional[str] = ..., accelerator: Optional[BaseAccelerator] = ..., @@ -192,7 +192,7 @@ def task( docs: Optional[Documentation] = None, disable_deck: Optional[bool] = None, enable_deck: Optional[bool] = None, - decks: Optional[Tuple[DeckField, ...]] = (DeckField.SOURCE_CODE, DeckField.DEPENDENCIES), + deck_fields: Optional[Tuple[DeckField, ...]] = (DeckField.SOURCE_CODE, DeckField.DEPENDENCIES), pod_template: Optional["PodTemplate"] = None, pod_template_name: Optional[str] = None, accelerator: Optional[BaseAccelerator] = None, @@ -313,7 +313,7 @@ def launch_dynamically(): :param task_resolver: Provide a custom task resolver. :param disable_deck: (deprecated) If true, this task will not output deck html file :param enable_deck: If true, this task will output deck html file - :param decks: If specified and enble_deck is True, this task will output deck html file with the fields specified in the list + :param deck_fields: If specified and enble_deck is True, this task will output deck html file with the fields specified in the tuple :param docs: Documentation about this task :param pod_template: Custom PodTemplate for this task. :param pod_template_name: The name of the existing PodTemplate resource which will be used in this task. @@ -346,7 +346,7 @@ def wrapper(fn: Callable[..., Any]) -> PythonFunctionTask[T]: task_resolver=task_resolver, disable_deck=disable_deck, enable_deck=enable_deck, - decks=decks, + deck_fields=deck_fields, docs=docs, pod_template=pod_template, pod_template_name=pod_template_name, diff --git a/tests/flytekit/unit/core/test_utils.py b/tests/flytekit/unit/core/test_utils.py index 7afa97dff6..3e9c42dba0 100644 --- a/tests/flytekit/unit/core/test_utils.py +++ b/tests/flytekit/unit/core/test_utils.py @@ -38,7 +38,7 @@ def test_timeit(): with timeit("Set disable_deck to False"): kwargs = {} kwargs["disable_deck"] = False - kwargs["decks"] = (DeckField.TIMELINE.value,) + kwargs["deck_fields"] = (DeckField.TIMELINE.value,) ctx = FlyteContextManager.current_context() time_info_list = ctx.user_space_params.timeline_deck.time_info diff --git a/tests/flytekit/unit/deck/test_deck.py b/tests/flytekit/unit/deck/test_deck.py index 035b182901..98096b4a2b 100644 --- a/tests/flytekit/unit/deck/test_deck.py +++ b/tests/flytekit/unit/deck/test_deck.py @@ -73,7 +73,7 @@ def t1(a: int) -> str: @pytest.mark.parametrize( - "decks,enable_deck,expected_decks", + "deck_fields,enable_deck,expected_decks", [ ((), True, 0), ((DeckField.INPUT.value), False, 0), @@ -86,12 +86,12 @@ def t1(a: int) -> str: ], ) @mock.patch("flytekit.deck.deck._output_deck") -def test_additional_deck_for_task(_output_deck, decks, enable_deck, expected_decks): +def test_additional_deck_for_task(_output_deck, deck_fields, enable_deck, expected_decks): ctx = FlyteContextManager.current_context() kwargs = {} - if decks is not None: - kwargs["decks"] = decks + if deck_fields is not None: + kwargs["deck_fields"] = deck_fields if enable_deck is not None: kwargs["enable_deck"] = enable_deck @@ -104,16 +104,16 @@ def t1(a: int) -> str: @pytest.mark.parametrize( - "decks,enable_deck,disable_deck", + "deck_fields,enable_deck,disable_deck", [ (None, True, False), (("WrongDeck", DeckField.INPUT.value, DeckField.OUTPUT.value), True, None), # WrongDeck is not a valid field ], ) -def test_invalid_deck_params(decks, enable_deck, disable_deck): +def test_invalid_deck_params(deck_fields, enable_deck, disable_deck): kwargs = {} - if decks is not None: - kwargs["decks"] = decks + if deck_fields is not None: + kwargs["deck_fields"] = deck_fields if enable_deck is not None: kwargs["enable_deck"] = enable_deck if disable_deck is not None: From d246d1930fa9cd654ef851b12c8bf41ec4123e6f Mon Sep 17 00:00:00 2001 From: novahow Date: Wed, 19 Jun 2024 11:51:31 +0800 Subject: [PATCH 12/15] enable all decks, remove plotly dep Signed-off-by: novahow --- flytekit/core/base_task.py | 8 ++++++- flytekit/core/task.py | 8 ++++++- flytekit/deck/deck.py | 7 +++++-- .../flytekitplugins/deck/renderer.py | 21 +++++++++++++++++-- plugins/flytekit-deck-standard/setup.py | 2 +- .../tests/test_renderer.py | 3 +++ tests/flytekit/unit/deck/test_deck.py | 12 +++++------ 7 files changed, 48 insertions(+), 13 deletions(-) diff --git a/flytekit/core/base_task.py b/flytekit/core/base_task.py index 4561834f6e..132cd6d474 100644 --- a/flytekit/core/base_task.py +++ b/flytekit/core/base_task.py @@ -463,7 +463,13 @@ def __init__( environment: Optional[Dict[str, str]] = None, disable_deck: Optional[bool] = None, enable_deck: Optional[bool] = None, - deck_fields: Optional[Tuple[DeckField, ...]] = (DeckField.SOURCE_CODE, DeckField.DEPENDENCIES), + deck_fields: Optional[Tuple[DeckField, ...]] = ( + DeckField.SOURCE_CODE, + DeckField.DEPENDENCIES, + DeckField.TIMELINE, + DeckField.INPUT, + DeckField.OUTPUT, + ), **kwargs, ): """ diff --git a/flytekit/core/task.py b/flytekit/core/task.py index 4597a0612d..4bd31dd3f9 100644 --- a/flytekit/core/task.py +++ b/flytekit/core/task.py @@ -192,7 +192,13 @@ def task( docs: Optional[Documentation] = None, disable_deck: Optional[bool] = None, enable_deck: Optional[bool] = None, - deck_fields: Optional[Tuple[DeckField, ...]] = (DeckField.SOURCE_CODE, DeckField.DEPENDENCIES), + deck_fields: Optional[Tuple[DeckField, ...]] = ( + DeckField.SOURCE_CODE, + DeckField.DEPENDENCIES, + DeckField.TIMELINE, + DeckField.INPUT, + DeckField.OUTPUT, + ), pod_template: Optional["PodTemplate"] = None, pod_template_name: Optional[str] = None, accelerator: Optional[BaseAccelerator] = None, diff --git a/flytekit/deck/deck.py b/flytekit/deck/deck.py index b39c874101..b8b13cd016 100644 --- a/flytekit/deck/deck.py +++ b/flytekit/deck/deck.py @@ -103,13 +103,16 @@ def append_time_info(self, info: dict): @property def html(self) -> str: + gantt_dep_installed = False try: - from flytekitplugins.deck.renderer import GanttChartRenderer, TableRenderer + from flytekitplugins.deck.renderer import TableRenderer except ImportError: warning_info = "Plugin 'flytekit-deck-standard' is not installed. To display time line, install the plugin in the image." logger.warning(warning_info) return warning_info + from flytekitplugins.deck.renderer import GanttChartRenderer, px_installed + gantt_dep_installed = px_installed if len(self.time_info) == 0: return "" @@ -127,7 +130,7 @@ def html(self) -> str: df["ProcessTime"] = df["ProcessTime"].apply(lambda time: "{:.6f}".format(time)) df["WallTime"] = df["WallTime"].apply(lambda time: "{:.6f}".format(time)) - gantt_chart_html = GanttChartRenderer().to_html(df) + gantt_chart_html = GanttChartRenderer().to_html(df) if gantt_dep_installed else "" time_table_html = TableRenderer().to_html( df[["Name", "WallTime", "ProcessTime"]], header_labels=["Name", "Wall Time(s)", "Process Time(s)"], diff --git a/plugins/flytekit-deck-standard/flytekitplugins/deck/renderer.py b/plugins/flytekit-deck-standard/flytekitplugins/deck/renderer.py index fbf05f0efe..17f51060ea 100644 --- a/plugins/flytekit-deck-standard/flytekitplugins/deck/renderer.py +++ b/plugins/flytekit-deck-standard/flytekitplugins/deck/renderer.py @@ -4,16 +4,27 @@ from flytekit import lazy_module from flytekit.types.file import FlyteFile +px_installed: bool = False if TYPE_CHECKING: import markdown import pandas as pd import PIL.Image - import plotly.express as px + + try: + import plotly.express as px + + px_installed = True + except ImportError: + pass else: pd = lazy_module("pandas") markdown = lazy_module("markdown") - px = lazy_module("plotly.express") PIL = lazy_module("PIL") + try: + px = lazy_module("plotly.express") + px_installed = True + except ImportError: + pass class SourceCodeRenderer: @@ -96,6 +107,8 @@ class BoxRenderer: # More detail, see https://plotly.com/python/box-plots/ def __init__(self, column_name): self._column_name = column_name + if not px_installed: + raise ImportError("plotly is not installed") def to_html(self, df: "pd.DataFrame") -> str: fig = px.box(df, y=self._column_name) @@ -186,6 +199,10 @@ class GanttChartRenderer: - "Name": string (the name of the task or event) """ + def __init__(self) -> None: + if not px_installed: + raise ImportError("plotly is not installed") + def to_html(self, df: pd.DataFrame, chart_width: Optional[int] = None) -> str: fig = px.timeline(df, x_start="Start", x_end="Finish", y="Name", color="Name", width=chart_width) diff --git a/plugins/flytekit-deck-standard/setup.py b/plugins/flytekit-deck-standard/setup.py index b0d2c4783d..494294d6db 100644 --- a/plugins/flytekit-deck-standard/setup.py +++ b/plugins/flytekit-deck-standard/setup.py @@ -7,7 +7,7 @@ plugin_requires = [ "flytekit", "markdown", - "plotly", + # "plotly", # ydata-profiling is not compatible with python 3.12 yet: https://github.com/ydataai/ydata-profiling/issues/1510 "ydata-profiling; python_version<'3.12'", "pandas", diff --git a/plugins/flytekit-deck-standard/tests/test_renderer.py b/plugins/flytekit-deck-standard/tests/test_renderer.py index dbe157cefc..44d3ab0259 100644 --- a/plugins/flytekit-deck-standard/tests/test_renderer.py +++ b/plugins/flytekit-deck-standard/tests/test_renderer.py @@ -12,6 +12,7 @@ MarkdownRenderer, SourceCodeRenderer, TableRenderer, + px_installed, ) from PIL import Image @@ -44,6 +45,7 @@ def test_markdown_renderer(): assert renderer.to_html(md_text) == markdown.markdown(md_text) +@pytest.mark.skipif(not px_installed, reason="Plotly is not installed") def test_box_renderer(): renderer = BoxRenderer("Name") assert "Plotlyconfig = {Mathjaxconfig: 'Local'}" in renderer.to_html(df).title() @@ -80,6 +82,7 @@ def test_table_renderer(): assert "Dataframe Table-Class" in renderer.to_html(time_info_df).title() +@pytest.mark.skipif(not px_installed, reason="Plotly is not installed") def test_gantt_chart_renderer(): renderer = GanttChartRenderer() assert "Plotlyconfig = {Mathjaxconfig: 'Local'}" in renderer.to_html(time_info_df).title() diff --git a/tests/flytekit/unit/deck/test_deck.py b/tests/flytekit/unit/deck/test_deck.py index 98096b4a2b..c19318e9b8 100644 --- a/tests/flytekit/unit/deck/test_deck.py +++ b/tests/flytekit/unit/deck/test_deck.py @@ -53,7 +53,7 @@ def test_timeline_deck(): "disable_deck,expected_decks", [ (None, 0), - (False, 2), # source code deck + python dependency deck + (False, 5), # source code + dependency + input + output + timeline decks (True, 0), ], ) @@ -82,7 +82,7 @@ def t1(a: int) -> str: True, 4, # time line deck + dependency + input and output decks ), - (None, True, 2), # source code deck + python dependency deck + (None, True, 5), # source code + dependency + input + output + timeline decks ], ) @mock.patch("flytekit.deck.deck._output_deck") @@ -135,16 +135,16 @@ def t1(a: int) -> str: ( None, False, - 3, + 6, False, - ), # default deck + source code deck + python dependency deck + ), # default deck + source code + dependency + input + output + timeline decks (None, True, 1, False), # default deck ( True, None, - 3, + 6, False, - ), # default deck + source code deck + python dependency deck + ), # default deck + source code + dependency + input + output + timeline decks (False, None, 1, False), # default deck (True, True, -1, True), # Set both disable_deck and enable_deck to True and confirm that it fails (False, False, -1, True), # Set both disable_deck and enable_deck to False and confirm that it fails From 92fbd368465ab314d7b1c3488e0a034f119490d2 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 1 Jul 2024 14:09:21 -0700 Subject: [PATCH 13/15] kevin's update Signed-off-by: Kevin Su --- flytekit/core/base_task.py | 2 +- flytekit/deck/deck.py | 76 +++++++++++++------ .../flytekitplugins/deck/renderer.py | 21 +---- plugins/flytekit-deck-standard/setup.py | 2 +- tests/flytekit/unit/deck/test_deck.py | 2 +- 5 files changed, 58 insertions(+), 45 deletions(-) diff --git a/flytekit/core/base_task.py b/flytekit/core/base_task.py index 132cd6d474..3705753043 100644 --- a/flytekit/core/base_task.py +++ b/flytekit/core/base_task.py @@ -506,7 +506,7 @@ def __init__( if disable_deck is not None: warnings.warn( - "disable_deck was deprecated in 1.10.0, please use enable_deck and decks instead", + "disable_deck was deprecated in 1.10.0, please use enable_deck instead", FutureWarning, ) diff --git a/flytekit/deck/deck.py b/flytekit/deck/deck.py index b8b13cd016..4d9f999449 100644 --- a/flytekit/deck/deck.py +++ b/flytekit/deck/deck.py @@ -103,22 +103,9 @@ def append_time_info(self, info: dict): @property def html(self) -> str: - gantt_dep_installed = False - try: - from flytekitplugins.deck.renderer import TableRenderer - except ImportError: - warning_info = "Plugin 'flytekit-deck-standard' is not installed. To display time line, install the plugin in the image." - logger.warning(warning_info) - return warning_info - from flytekitplugins.deck.renderer import GanttChartRenderer, px_installed - - gantt_dep_installed = px_installed if len(self.time_info) == 0: return "" - import pandas - - df = pandas.DataFrame(self.time_info) note = """

Note:

    @@ -126,16 +113,59 @@ def html(self) -> str:
  1. For accurate execution time measurements, users should refer to wall time and process time.
""" - # set the accuracy to microsecond - df["ProcessTime"] = df["ProcessTime"].apply(lambda time: "{:.6f}".format(time)) - df["WallTime"] = df["WallTime"].apply(lambda time: "{:.6f}".format(time)) - - gantt_chart_html = GanttChartRenderer().to_html(df) if gantt_dep_installed else "" - time_table_html = TableRenderer().to_html( - df[["Name", "WallTime", "ProcessTime"]], - header_labels=["Name", "Wall Time(s)", "Process Time(s)"], - ) - return gantt_chart_html + time_table_html + note + + try: + import pandas + from flytekitplugins.deck.renderer import GanttChartRenderer, TableRenderer + + df = pandas.DataFrame(self.time_info) + # set the accuracy to microsecond + df["ProcessTime"] = df["ProcessTime"].apply(lambda time: "{:.6f}".format(time)) + df["WallTime"] = df["WallTime"].apply(lambda time: "{:.6f}".format(time)) + + gantt_chart_html = GanttChartRenderer().to_html(df) + time_table_html = TableRenderer().to_html( + df[["Name", "WallTime", "ProcessTime"]], + header_labels=["Name", "Wall Time(s)", "Process Time(s)"], + ) + return gantt_chart_html + time_table_html + note + except ImportError: + warning_info = ( + "

To display timeline chart, please install flytekitplugins-deck-standard in the image.

" + ) + logger.warning(warning_info) + + return warning_info + generate_time_table(self.time_info) + note + + +def generate_time_table(data: dict) -> str: + html = [ + '', + """ + + + + + + + + """, + "", + ] + + # Add table headers + + # Add table rows + for row in data: + html.append("") + html.append(f"") + html.append(f"") + html.append(f"") + html.append("") + html.append("") + + html.append("
NameWall Time(s)Process Time(s)
{row['Name']}{row['WallTime']:.6f}{row['ProcessTime']:.6f}
") + return "".join(html) def _get_deck( diff --git a/plugins/flytekit-deck-standard/flytekitplugins/deck/renderer.py b/plugins/flytekit-deck-standard/flytekitplugins/deck/renderer.py index 17f51060ea..b17301100b 100644 --- a/plugins/flytekit-deck-standard/flytekitplugins/deck/renderer.py +++ b/plugins/flytekit-deck-standard/flytekitplugins/deck/renderer.py @@ -4,27 +4,16 @@ from flytekit import lazy_module from flytekit.types.file import FlyteFile -px_installed: bool = False if TYPE_CHECKING: import markdown import pandas as pd import PIL.Image - - try: - import plotly.express as px - - px_installed = True - except ImportError: - pass + import plotly.express as px else: pd = lazy_module("pandas") markdown = lazy_module("markdown") PIL = lazy_module("PIL") - try: - px = lazy_module("plotly.express") - px_installed = True - except ImportError: - pass + px = lazy_module("plotly.express") class SourceCodeRenderer: @@ -107,8 +96,6 @@ class BoxRenderer: # More detail, see https://plotly.com/python/box-plots/ def __init__(self, column_name): self._column_name = column_name - if not px_installed: - raise ImportError("plotly is not installed") def to_html(self, df: "pd.DataFrame") -> str: fig = px.box(df, y=self._column_name) @@ -199,10 +186,6 @@ class GanttChartRenderer: - "Name": string (the name of the task or event) """ - def __init__(self) -> None: - if not px_installed: - raise ImportError("plotly is not installed") - def to_html(self, df: pd.DataFrame, chart_width: Optional[int] = None) -> str: fig = px.timeline(df, x_start="Start", x_end="Finish", y="Name", color="Name", width=chart_width) diff --git a/plugins/flytekit-deck-standard/setup.py b/plugins/flytekit-deck-standard/setup.py index 494294d6db..b0d2c4783d 100644 --- a/plugins/flytekit-deck-standard/setup.py +++ b/plugins/flytekit-deck-standard/setup.py @@ -7,7 +7,7 @@ plugin_requires = [ "flytekit", "markdown", - # "plotly", + "plotly", # ydata-profiling is not compatible with python 3.12 yet: https://github.com/ydataai/ydata-profiling/issues/1510 "ydata-profiling; python_version<'3.12'", "pandas", diff --git a/tests/flytekit/unit/deck/test_deck.py b/tests/flytekit/unit/deck/test_deck.py index c19318e9b8..ce07317a94 100644 --- a/tests/flytekit/unit/deck/test_deck.py +++ b/tests/flytekit/unit/deck/test_deck.py @@ -184,7 +184,7 @@ def t_df(a: str) -> int: def test_deck_deprecation_warning_disable_deck(): - warn_msg = "disable_deck was deprecated in 1.10.0, please use enable_deck and decks instead" + warn_msg = "disable_deck was deprecated in 1.10.0, please use enable_deck instead" with pytest.warns(FutureWarning, match=warn_msg): @task(disable_deck=False) From 069b93560d588c838745fa317e94b72395a77540 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 1 Jul 2024 14:25:42 -0700 Subject: [PATCH 14/15] nit Signed-off-by: Kevin Su --- flytekit/deck/deck.py | 2 -- .../flytekit-deck-standard/flytekitplugins/deck/renderer.py | 2 +- plugins/flytekit-deck-standard/tests/test_renderer.py | 3 --- 3 files changed, 1 insertion(+), 6 deletions(-) diff --git a/flytekit/deck/deck.py b/flytekit/deck/deck.py index 4d9f999449..1025faa65e 100644 --- a/flytekit/deck/deck.py +++ b/flytekit/deck/deck.py @@ -153,8 +153,6 @@ def generate_time_table(data: dict) -> str: "", ] - # Add table headers - # Add table rows for row in data: html.append("") diff --git a/plugins/flytekit-deck-standard/flytekitplugins/deck/renderer.py b/plugins/flytekit-deck-standard/flytekitplugins/deck/renderer.py index b17301100b..fbf05f0efe 100644 --- a/plugins/flytekit-deck-standard/flytekitplugins/deck/renderer.py +++ b/plugins/flytekit-deck-standard/flytekitplugins/deck/renderer.py @@ -12,8 +12,8 @@ else: pd = lazy_module("pandas") markdown = lazy_module("markdown") - PIL = lazy_module("PIL") px = lazy_module("plotly.express") + PIL = lazy_module("PIL") class SourceCodeRenderer: diff --git a/plugins/flytekit-deck-standard/tests/test_renderer.py b/plugins/flytekit-deck-standard/tests/test_renderer.py index 44d3ab0259..dbe157cefc 100644 --- a/plugins/flytekit-deck-standard/tests/test_renderer.py +++ b/plugins/flytekit-deck-standard/tests/test_renderer.py @@ -12,7 +12,6 @@ MarkdownRenderer, SourceCodeRenderer, TableRenderer, - px_installed, ) from PIL import Image @@ -45,7 +44,6 @@ def test_markdown_renderer(): assert renderer.to_html(md_text) == markdown.markdown(md_text) -@pytest.mark.skipif(not px_installed, reason="Plotly is not installed") def test_box_renderer(): renderer = BoxRenderer("Name") assert "Plotlyconfig = {Mathjaxconfig: 'Local'}" in renderer.to_html(df).title() @@ -82,7 +80,6 @@ def test_table_renderer(): assert "Dataframe Table-Class" in renderer.to_html(time_info_df).title() -@pytest.mark.skipif(not px_installed, reason="Plotly is not installed") def test_gantt_chart_renderer(): renderer = GanttChartRenderer() assert "Plotlyconfig = {Mathjaxconfig: 'Local'}" in renderer.to_html(time_info_df).title() From fb5806086926c1bc93452868fb612dff37bb75e3 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 1 Jul 2024 14:53:03 -0700 Subject: [PATCH 15/15] remove chart Signed-off-by: Kevin Su --- flytekit/deck/deck.py | 23 +---------------------- 1 file changed, 1 insertion(+), 22 deletions(-) diff --git a/flytekit/deck/deck.py b/flytekit/deck/deck.py index 1025faa65e..fbb398ef49 100644 --- a/flytekit/deck/deck.py +++ b/flytekit/deck/deck.py @@ -114,28 +114,7 @@ def html(self) -> str: """ - try: - import pandas - from flytekitplugins.deck.renderer import GanttChartRenderer, TableRenderer - - df = pandas.DataFrame(self.time_info) - # set the accuracy to microsecond - df["ProcessTime"] = df["ProcessTime"].apply(lambda time: "{:.6f}".format(time)) - df["WallTime"] = df["WallTime"].apply(lambda time: "{:.6f}".format(time)) - - gantt_chart_html = GanttChartRenderer().to_html(df) - time_table_html = TableRenderer().to_html( - df[["Name", "WallTime", "ProcessTime"]], - header_labels=["Name", "Wall Time(s)", "Process Time(s)"], - ) - return gantt_chart_html + time_table_html + note - except ImportError: - warning_info = ( - "

To display timeline chart, please install flytekitplugins-deck-standard in the image.

" - ) - logger.warning(warning_info) - - return warning_info + generate_time_table(self.time_info) + note + return generate_time_table(self.time_info) + note def generate_time_table(data: dict) -> str: