Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core/enable deck #2314

Merged
merged 16 commits into from
Jul 2, 2024
56 changes: 42 additions & 14 deletions flytekit/core/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
from flytekit.core.tracker import TrackedInstance
from flytekit.core.type_engine import TypeEngine, TypeTransformerFailedError
from flytekit.core.utils import timeit
from flytekit.deck.deck import DeckFields
from flytekit.loggers import logger
from flytekit.models import dynamic_job as _dynamic_job
from flytekit.models import interface as _interface_models
Expand Down Expand Up @@ -462,6 +463,7 @@
environment: Optional[Dict[str, str]] = None,
disable_deck: Optional[bool] = None,
enable_deck: Optional[bool] = None,
decks: Optional[Tuple[str, ...]] = None,
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
**kwargs,
):
"""
Expand All @@ -477,6 +479,8 @@
execution of the task. Supplied as a dictionary of key/value pairs
disable_deck (bool): (deprecated) If true, this task will not output deck html file
enable_deck (bool): If true, this task will output deck html file
decks (Tuple[str]): Tuple of decks to be
generated for this task. Valid values can be selected from fields of ``flytekit.deck.deck.DeckFields`` enum
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
"""
super().__init__(
task_type=task_type,
Expand All @@ -488,22 +492,35 @@
self._environment = environment if environment else {}
self._task_config = task_config

# first we resolve the conflict between params regarding decks, if any two of [disable_deck, enable_deck]
# are set, we raise an error
configured_deck_params = [disable_deck is not None, enable_deck is not None]
if sum(configured_deck_params) > 1:
raise ValueError("only one of [disable_deck, enable_deck] can be set")

Check warning on line 499 in flytekit/core/base_task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/base_task.py#L499

Added line #L499 was not covered by tests

if disable_deck is not None:
warnings.warn(
"disable_deck was deprecated in 1.10.0, please use enable_deck instead",
"disable_deck was deprecated in 1.10.0, please use enable_deck and decks instead",
FutureWarning,
)

# Confirm that disable_deck and enable_deck do not contradict each other
if disable_deck is not None and enable_deck is not None:
raise ValueError("disable_deck and enable_deck cannot both be set at the same time")

if enable_deck is not None:
self._disable_deck = not enable_deck
elif disable_deck is not None:
self._disable_deck = disable_deck
else:
self._disable_deck = True

self._decks = list(decks) if (decks is not None and self.disable_deck is False) else []

deck_members = set([_field.value for _field in DeckFields])
# enumerate additional decks, check if any of them are invalid
for deck in self._decks:
if deck not in deck_members:
raise ValueError(

Check warning on line 520 in flytekit/core/base_task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/base_task.py#L520

Added line #L520 was not covered by tests
f"Element {deck} from decks param is not a valid deck field. Please use one of {deck_members}"
)

if self._python_interface.docstring:
if self.docs is None:
self._docs = Documentation(
Expand Down Expand Up @@ -643,18 +660,20 @@

def _write_decks(self, native_inputs, native_outputs_as_map, ctx, new_user_params):
if self._disable_deck is False:
from flytekit.deck.deck import Deck, _output_deck
from flytekit.deck.deck import Deck, DeckFields, _output_deck

Check warning on line 663 in flytekit/core/base_task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/base_task.py#L663

Added line #L663 was not covered by tests

INPUT = "Inputs"
OUTPUT = "Outputs"
INPUT = DeckFields.INPUT
OUTPUT = DeckFields.OUTPUT

Check warning on line 666 in flytekit/core/base_task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/base_task.py#L665-L666

Added lines #L665 - L666 were not covered by tests

input_deck = Deck(INPUT)
for k, v in native_inputs.items():
input_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_input_var(k, v)))
if DeckFields.INPUT in self.decks:
input_deck = Deck(INPUT.value)

Check warning on line 669 in flytekit/core/base_task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/base_task.py#L669

Added line #L669 was not covered by tests
for k, v in native_inputs.items():
input_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_input_var(k, v)))

Check warning on line 671 in flytekit/core/base_task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/base_task.py#L671

Added line #L671 was not covered by tests

output_deck = Deck(OUTPUT)
for k, v in native_outputs_as_map.items():
output_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_output_var(k, v)))
if DeckFields.OUTPUT in self.decks:
output_deck = Deck(OUTPUT.value)

Check warning on line 674 in flytekit/core/base_task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/base_task.py#L674

Added line #L674 was not covered by tests
for k, v in native_outputs_as_map.items():
output_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_output_var(k, v)))

Check warning on line 676 in flytekit/core/base_task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/base_task.py#L676

Added line #L676 was not covered by tests

if ctx.execution_state and ctx.execution_state.is_local_execution():
# When we run the workflow remotely, flytekit outputs decks at the end of _dispatch_execute
Expand All @@ -679,6 +698,8 @@
may be none
* ``DynamicJobSpec`` is returned when a dynamic workflow is executed
"""
if DeckFields.TIMELINE.value in self.decks and ctx.user_space_params is not None:
ctx.user_space_params.decks.append(ctx.user_space_params.timeline_deck)

Check warning on line 702 in flytekit/core/base_task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/base_task.py#L702

Added line #L702 was not covered by tests
# Invoked before the task is executed
new_user_params = self.pre_execute(ctx.user_space_params)

Expand Down Expand Up @@ -789,6 +810,13 @@
"""
return self._disable_deck

@property
def decks(self) -> List[str]:
"""
If not empty, this task will output deck html file for the specified decks
"""
return self._decks

Check warning on line 818 in flytekit/core/base_task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/base_task.py#L818

Added line #L818 was not covered by tests


class TaskResolverMixin(object):
"""
Expand Down
12 changes: 10 additions & 2 deletions flytekit/core/context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
execution_date: typing.Optional[datetime] = None
logging: Optional[_logging.Logger] = None
task_id: typing.Optional[_identifier.Identifier] = None
output_metadata_prefix: Optional[str] = None
pingsutw marked this conversation as resolved.
Show resolved Hide resolved

def __init__(self, current: typing.Optional[ExecutionParameters] = None):
self.stats = current.stats if current else None
Expand All @@ -101,6 +102,7 @@
self.attrs = current._attrs if current else {}
self.raw_output_prefix = current.raw_output_prefix if current else None
self.task_id = current.task_id if current else None
self.output_metadata_prefix = current.output_metadata_prefix if current else None

Check warning on line 105 in flytekit/core/context_manager.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/context_manager.py#L105

Added line #L105 was not covered by tests

def add_attr(self, key: str, v: typing.Any) -> ExecutionParameters.Builder:
self.attrs[key] = v
Expand All @@ -119,6 +121,7 @@
decks=self.decks,
raw_output_prefix=self.raw_output_prefix,
task_id=self.task_id,
output_metadata_prefix=self.output_metadata_prefix,
**self.attrs,
)

Expand Down Expand Up @@ -182,6 +185,7 @@
self._checkpoint = checkpoint
self._decks = decks
self._task_id = task_id
self._timeline_deck = None

@property
def stats(self) -> taggable.TaggableStats:
Expand Down Expand Up @@ -274,16 +278,20 @@

@property
def timeline_deck(self) -> "TimeLineDeck": # type: ignore
from flytekit.deck.deck import TimeLineDeck
from flytekit.deck.deck import DeckFields, TimeLineDeck

Check warning on line 281 in flytekit/core/context_manager.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/context_manager.py#L281

Added line #L281 was not covered by tests

time_line_deck = None
for deck in self.decks:
if isinstance(deck, TimeLineDeck):
time_line_deck = deck
break
if time_line_deck is None:
time_line_deck = TimeLineDeck("Timeline")
if self._timeline_deck is not None:
time_line_deck = self._timeline_deck

Check warning on line 290 in flytekit/core/context_manager.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/context_manager.py#L290

Added line #L290 was not covered by tests
else:
time_line_deck = TimeLineDeck(DeckFields.TIMELINE.value, auto_add_to_deck=False)

Check warning on line 292 in flytekit/core/context_manager.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/context_manager.py#L292

Added line #L292 was not covered by tests

self._timeline_deck = time_line_deck

Check warning on line 294 in flytekit/core/context_manager.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/context_manager.py#L294

Added line #L294 was not covered by tests
return time_line_deck

def __getattr__(self, attr_name: str) -> typing.Any:
Expand Down
15 changes: 9 additions & 6 deletions flytekit/core/python_function_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,19 +351,22 @@
def _write_decks(self, native_inputs, native_outputs_as_map, ctx, new_user_params):
if self._disable_deck is False:
from flytekit.deck import Deck
from flytekit.deck.deck import DeckFields

Check warning on line 354 in flytekit/core/python_function_task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/python_function_task.py#L354

Added line #L354 was not covered by tests
from flytekit.deck.renderer import PythonDependencyRenderer

# These errors are raised if the source code can not be retrieved
with suppress(OSError, TypeError):
source_code = inspect.getsource(self._task_function)
from flytekit.deck.renderer import SourceCodeRenderer

source_code_deck = Deck("Source Code")
renderer = SourceCodeRenderer()
source_code_deck.append(renderer.to_html(source_code))
if DeckFields.SOURCE_CODE in self.decks:
source_code_deck = Deck(DeckFields.SOURCE_CODE.value)
renderer = SourceCodeRenderer()
source_code_deck.append(renderer.to_html(source_code))

Check warning on line 365 in flytekit/core/python_function_task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/python_function_task.py#L363-L365

Added lines #L363 - L365 were not covered by tests

python_dependencies_deck = Deck("Dependencies")
renderer = PythonDependencyRenderer()
python_dependencies_deck.append(renderer.to_html())
if DeckFields.DEPENDENCIES in self.decks:
python_dependencies_deck = Deck(DeckFields.DEPENDENCIES.value)
renderer = PythonDependencyRenderer()
python_dependencies_deck.append(renderer.to_html())

Check warning on line 370 in flytekit/core/python_function_task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/python_function_task.py#L368-L370

Added lines #L368 - L370 were not covered by tests

return super()._write_decks(native_inputs, native_outputs_as_map, ctx, new_user_params)
5 changes: 5 additions & 0 deletions flytekit/core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def task(
docs: Optional[Documentation] = ...,
disable_deck: Optional[bool] = ...,
enable_deck: Optional[bool] = ...,
decks: Optional[Tuple[str, ...]] = ...,
pod_template: Optional["PodTemplate"] = ...,
pod_template_name: Optional[str] = ...,
accelerator: Optional[BaseAccelerator] = ...,
Expand Down Expand Up @@ -152,6 +153,7 @@ def task(
docs: Optional[Documentation] = ...,
disable_deck: Optional[bool] = ...,
enable_deck: Optional[bool] = ...,
decks: Optional[Tuple[str, ...]] = ...,
pod_template: Optional["PodTemplate"] = ...,
pod_template_name: Optional[str] = ...,
accelerator: Optional[BaseAccelerator] = ...,
Expand Down Expand Up @@ -189,6 +191,7 @@ def task(
docs: Optional[Documentation] = None,
disable_deck: Optional[bool] = None,
enable_deck: Optional[bool] = None,
decks: Optional[Tuple[str, ...]] = ("Source Code", "Dependencies"),
pod_template: Optional["PodTemplate"] = None,
pod_template_name: Optional[str] = None,
accelerator: Optional[BaseAccelerator] = None,
Expand Down Expand Up @@ -309,6 +312,7 @@ def launch_dynamically():
:param task_resolver: Provide a custom task resolver.
:param disable_deck: (deprecated) If true, this task will not output deck html file
:param enable_deck: If true, this task will output deck html file
:param decks: If specified and enble_deck is True, this task will output deck html file with the fields specified in the list
:param docs: Documentation about this task
:param pod_template: Custom PodTemplate for this task.
:param pod_template_name: The name of the existing PodTemplate resource which will be used in this task.
Expand Down Expand Up @@ -341,6 +345,7 @@ def wrapper(fn: Callable[..., Any]) -> PythonFunctionTask[T]:
task_resolver=task_resolver,
disable_deck=disable_deck,
enable_deck=enable_deck,
decks=decks,
docs=docs,
pod_template=pod_template,
pod_template_name=pod_template_name,
Expand Down
23 changes: 19 additions & 4 deletions flytekit/deck/deck.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import enum
import os
import typing
from typing import Optional
Expand All @@ -10,6 +11,18 @@
DECK_FILE_NAME = "deck.html"


class DeckFields(str, enum.Enum):
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
"""
DeckFields is used to specify the fields that will be rendered in the deck.
"""

INPUT = "Input"
OUTPUT = "Output"
SOURCE_CODE = "Source Code"
TIMELINE = "Timeline"
thomasjpfan marked this conversation as resolved.
Show resolved Hide resolved
DEPENDENCIES = "Dependencies"


class Deck:
"""
Deck enable users to get customizable and default visibility into their tasks.
Expand Down Expand Up @@ -52,10 +65,11 @@
return iris_df
"""

def __init__(self, name: str, html: Optional[str] = ""):
def __init__(self, name: str, html: Optional[str] = "", auto_add_to_deck: bool = True):
self._name = name
self._html = html
FlyteContextManager.current_context().user_space_params.decks.append(self)
if auto_add_to_deck:
FlyteContextManager.current_context().user_space_params.decks.append(self)

Check warning on line 72 in flytekit/deck/deck.py

View check run for this annotation

Codecov / codecov/patch

flytekit/deck/deck.py#L72

Added line #L72 was not covered by tests

def append(self, html: str) -> "Deck":
assert isinstance(html, str)
Expand All @@ -79,8 +93,8 @@
Instead, the complete data set is used to create a comprehensive visualization of the execution time of each part of the task.
"""

def __init__(self, name: str, html: Optional[str] = ""):
super().__init__(name, html)
def __init__(self, name: str, html: Optional[str] = "", auto_add_to_deck: bool = True):
super().__init__(name, html, auto_add_to_deck)

Check warning on line 97 in flytekit/deck/deck.py

View check run for this annotation

Codecov / codecov/patch

flytekit/deck/deck.py#L97

Added line #L97 was not covered by tests
self.time_info = []

def append_time_info(self, info: dict):
Expand Down Expand Up @@ -129,6 +143,7 @@
If ignore_jupyter is set to True, then it will return a str even in a jupyter environment.
"""
deck_map = {deck.name: deck.html for deck in new_user_params.decks}

raw_html = get_deck_template().render(metadata=deck_map)
if not ignore_jupyter and ipython_check():
try:
Expand Down
3 changes: 3 additions & 0 deletions tests/flytekit/unit/core/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@ def test_timeit():
ctx = FlyteContextManager.current_context()
ctx.user_space_params._decks = []

from flytekit.deck.deck import DeckFields

with timeit("Set disable_deck to False"):
kwargs = {}
kwargs["disable_deck"] = False
kwargs["decks"] = (DeckFields.TIMELINE.value,)

ctx = FlyteContextManager.current_context()
time_info_list = ctx.user_space_params.timeline_deck.time_info
Expand Down
Loading
Loading