Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core/enable deck #2314

Merged
merged 16 commits into from
Jul 2, 2024
60 changes: 47 additions & 13 deletions flytekit/core/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
from flytekit.core.tracker import TrackedInstance
from flytekit.core.type_engine import TypeEngine, TypeTransformerFailedError
from flytekit.core.utils import timeit
from flytekit.deck import DeckField
from flytekit.loggers import logger
from flytekit.models import dynamic_job as _dynamic_job
from flytekit.models import interface as _interface_models
Expand Down Expand Up @@ -464,6 +465,13 @@
environment: Optional[Dict[str, str]] = None,
disable_deck: Optional[bool] = None,
enable_deck: Optional[bool] = None,
deck_fields: Optional[Tuple[DeckField, ...]] = (
DeckField.SOURCE_CODE,
DeckField.DEPENDENCIES,
DeckField.TIMELINE,
DeckField.INPUT,
DeckField.OUTPUT,
),
**kwargs,
):
"""
Expand All @@ -479,6 +487,8 @@
execution of the task. Supplied as a dictionary of key/value pairs
disable_deck (bool): (deprecated) If true, this task will not output deck html file
enable_deck (bool): If true, this task will output deck html file
deck_fields (Tuple[DeckField]): Tuple of decks to be
generated for this task. Valid values can be selected from fields of ``flytekit.deck.DeckField`` enum
"""
super().__init__(
task_type=task_type,
Expand All @@ -490,22 +500,35 @@
self._environment = environment if environment else {}
self._task_config = task_config

# first we resolve the conflict between params regarding decks, if any two of [disable_deck, enable_deck]
# are set, we raise an error
configured_deck_params = [disable_deck is not None, enable_deck is not None]
if sum(configured_deck_params) > 1:
raise ValueError("only one of [disable_deck, enable_deck] can be set")

Check warning on line 507 in flytekit/core/base_task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/base_task.py#L507

Added line #L507 was not covered by tests

if disable_deck is not None:
warnings.warn(
"disable_deck was deprecated in 1.10.0, please use enable_deck instead",
FutureWarning,
)

# Confirm that disable_deck and enable_deck do not contradict each other
if disable_deck is not None and enable_deck is not None:
raise ValueError("disable_deck and enable_deck cannot both be set at the same time")

if enable_deck is not None:
self._disable_deck = not enable_deck
elif disable_deck is not None:
self._disable_deck = disable_deck
else:
self._disable_deck = True

self._deck_fields = list(deck_fields) if (deck_fields is not None and self.disable_deck is False) else []

deck_members = set([_field for _field in DeckField])
# enumerate additional decks, check if any of them are invalid
for deck in self._deck_fields:
if deck not in deck_members:
raise ValueError(

Check warning on line 528 in flytekit/core/base_task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/base_task.py#L528

Added line #L528 was not covered by tests
f"Element {deck} from deck_fields param is not a valid deck field. Please use one of {deck_members}"
)

if self._python_interface.docstring:
if self.docs is None:
self._docs = Documentation(
Expand Down Expand Up @@ -645,18 +668,20 @@

def _write_decks(self, native_inputs, native_outputs_as_map, ctx, new_user_params):
if self._disable_deck is False:
from flytekit.deck.deck import Deck, _output_deck
from flytekit.deck.deck import Deck, DeckField, _output_deck

Check warning on line 671 in flytekit/core/base_task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/base_task.py#L671

Added line #L671 was not covered by tests

INPUT = "Inputs"
OUTPUT = "Outputs"
INPUT = DeckField.INPUT
OUTPUT = DeckField.OUTPUT

Check warning on line 674 in flytekit/core/base_task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/base_task.py#L673-L674

Added lines #L673 - L674 were not covered by tests

input_deck = Deck(INPUT)
for k, v in native_inputs.items():
input_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_input_var(k, v)))
if DeckField.INPUT in self.deck_fields:
input_deck = Deck(INPUT.value)

Check warning on line 677 in flytekit/core/base_task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/base_task.py#L677

Added line #L677 was not covered by tests
for k, v in native_inputs.items():
input_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_input_var(k, v)))

Check warning on line 679 in flytekit/core/base_task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/base_task.py#L679

Added line #L679 was not covered by tests

output_deck = Deck(OUTPUT)
for k, v in native_outputs_as_map.items():
output_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_output_var(k, v)))
if DeckField.OUTPUT in self.deck_fields:
output_deck = Deck(OUTPUT.value)

Check warning on line 682 in flytekit/core/base_task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/base_task.py#L682

Added line #L682 was not covered by tests
for k, v in native_outputs_as_map.items():
output_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_output_var(k, v)))

Check warning on line 684 in flytekit/core/base_task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/base_task.py#L684

Added line #L684 was not covered by tests

if ctx.execution_state and ctx.execution_state.is_local_execution():
# When we run the workflow remotely, flytekit outputs decks at the end of _dispatch_execute
Expand All @@ -681,6 +706,8 @@
may be none
* ``DynamicJobSpec`` is returned when a dynamic workflow is executed
"""
if DeckField.TIMELINE.value in self.deck_fields and ctx.user_space_params is not None:
ctx.user_space_params.decks.append(ctx.user_space_params.timeline_deck)

Check warning on line 710 in flytekit/core/base_task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/base_task.py#L710

Added line #L710 was not covered by tests
# Invoked before the task is executed
new_user_params = self.pre_execute(ctx.user_space_params)

Expand Down Expand Up @@ -791,6 +818,13 @@
"""
return self._disable_deck

@property
def deck_fields(self) -> List[DeckField]:
"""
If not empty, this task will output deck html file for the specified decks
"""
return self._deck_fields

Check warning on line 826 in flytekit/core/base_task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/base_task.py#L826

Added line #L826 was not covered by tests


class TaskResolverMixin(object):
"""
Expand Down
12 changes: 10 additions & 2 deletions flytekit/core/context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
execution_date: typing.Optional[datetime] = None
logging: Optional[_logging.Logger] = None
task_id: typing.Optional[_identifier.Identifier] = None
output_metadata_prefix: Optional[str] = None
pingsutw marked this conversation as resolved.
Show resolved Hide resolved

def __init__(self, current: typing.Optional[ExecutionParameters] = None):
self.stats = current.stats if current else None
Expand All @@ -100,6 +101,7 @@
self.attrs = current._attrs if current else {}
self.raw_output_prefix = current.raw_output_prefix if current else None
self.task_id = current.task_id if current else None
self.output_metadata_prefix = current.output_metadata_prefix if current else None

Check warning on line 104 in flytekit/core/context_manager.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/context_manager.py#L104

Added line #L104 was not covered by tests

def add_attr(self, key: str, v: typing.Any) -> ExecutionParameters.Builder:
self.attrs[key] = v
Expand All @@ -118,6 +120,7 @@
decks=self.decks,
raw_output_prefix=self.raw_output_prefix,
task_id=self.task_id,
output_metadata_prefix=self.output_metadata_prefix,
**self.attrs,
)

Expand Down Expand Up @@ -181,6 +184,7 @@
self._checkpoint = checkpoint
self._decks = decks
self._task_id = task_id
self._timeline_deck = None

@property
def stats(self) -> taggable.TaggableStats:
Expand Down Expand Up @@ -273,16 +277,20 @@

@property
def timeline_deck(self) -> "TimeLineDeck": # type: ignore
from flytekit.deck.deck import TimeLineDeck
from flytekit.deck.deck import DeckField, TimeLineDeck

Check warning on line 280 in flytekit/core/context_manager.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/context_manager.py#L280

Added line #L280 was not covered by tests

time_line_deck = None
for deck in self.decks:
if isinstance(deck, TimeLineDeck):
time_line_deck = deck
break
if time_line_deck is None:
time_line_deck = TimeLineDeck("Timeline")
if self._timeline_deck is not None:
time_line_deck = self._timeline_deck

Check warning on line 289 in flytekit/core/context_manager.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/context_manager.py#L289

Added line #L289 was not covered by tests
else:
time_line_deck = TimeLineDeck(DeckField.TIMELINE.value, auto_add_to_deck=False)

Check warning on line 291 in flytekit/core/context_manager.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/context_manager.py#L291

Added line #L291 was not covered by tests

self._timeline_deck = time_line_deck

Check warning on line 293 in flytekit/core/context_manager.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/context_manager.py#L293

Added line #L293 was not covered by tests
return time_line_deck

def __getattr__(self, attr_name: str) -> typing.Any:
Expand Down
16 changes: 9 additions & 7 deletions flytekit/core/python_function_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,20 +352,22 @@

def _write_decks(self, native_inputs, native_outputs_as_map, ctx, new_user_params):
if self._disable_deck is False:
from flytekit.deck import Deck
from flytekit.deck import Deck, DeckField

Check warning on line 355 in flytekit/core/python_function_task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/python_function_task.py#L355

Added line #L355 was not covered by tests
from flytekit.deck.renderer import PythonDependencyRenderer

# These errors are raised if the source code can not be retrieved
with suppress(OSError, TypeError):
source_code = inspect.getsource(self._task_function)
from flytekit.deck.renderer import SourceCodeRenderer

source_code_deck = Deck("Source Code")
renderer = SourceCodeRenderer()
source_code_deck.append(renderer.to_html(source_code))
if DeckField.SOURCE_CODE in self.deck_fields:
source_code_deck = Deck(DeckField.SOURCE_CODE.value)
renderer = SourceCodeRenderer()
source_code_deck.append(renderer.to_html(source_code))

Check warning on line 366 in flytekit/core/python_function_task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/python_function_task.py#L364-L366

Added lines #L364 - L366 were not covered by tests

python_dependencies_deck = Deck("Dependencies")
renderer = PythonDependencyRenderer()
python_dependencies_deck.append(renderer.to_html())
if DeckField.DEPENDENCIES in self.deck_fields:
python_dependencies_deck = Deck(DeckField.DEPENDENCIES.value)
renderer = PythonDependencyRenderer()
python_dependencies_deck.append(renderer.to_html())

Check warning on line 371 in flytekit/core/python_function_task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/python_function_task.py#L369-L371

Added lines #L369 - L371 were not covered by tests

return super()._write_decks(native_inputs, native_outputs_as_map, ctx, new_user_params)
12 changes: 12 additions & 0 deletions flytekit/core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from flytekit.core.python_function_task import PythonFunctionTask
from flytekit.core.reference_entity import ReferenceEntity, TaskReference
from flytekit.core.resources import Resources
from flytekit.deck import DeckField
from flytekit.extras.accelerators import BaseAccelerator
from flytekit.image_spec.image_spec import ImageSpec
from flytekit.models.documentation import Documentation
Expand Down Expand Up @@ -114,6 +115,7 @@ def task(
docs: Optional[Documentation] = ...,
disable_deck: Optional[bool] = ...,
enable_deck: Optional[bool] = ...,
deck_fields: Optional[Tuple[DeckField, ...]] = ...,
pod_template: Optional["PodTemplate"] = ...,
pod_template_name: Optional[str] = ...,
accelerator: Optional[BaseAccelerator] = ...,
Expand Down Expand Up @@ -151,6 +153,7 @@ def task(
docs: Optional[Documentation] = ...,
disable_deck: Optional[bool] = ...,
enable_deck: Optional[bool] = ...,
deck_fields: Optional[Tuple[DeckField, ...]] = ...,
pod_template: Optional["PodTemplate"] = ...,
pod_template_name: Optional[str] = ...,
accelerator: Optional[BaseAccelerator] = ...,
Expand Down Expand Up @@ -187,6 +190,13 @@ def task(
docs: Optional[Documentation] = None,
disable_deck: Optional[bool] = None,
enable_deck: Optional[bool] = None,
deck_fields: Optional[Tuple[DeckField, ...]] = (
DeckField.SOURCE_CODE,
DeckField.DEPENDENCIES,
DeckField.TIMELINE,
DeckField.INPUT,
DeckField.OUTPUT,
),
pod_template: Optional["PodTemplate"] = None,
pod_template_name: Optional[str] = None,
accelerator: Optional[BaseAccelerator] = None,
Expand Down Expand Up @@ -307,6 +317,7 @@ def launch_dynamically():
:param task_resolver: Provide a custom task resolver.
:param disable_deck: (deprecated) If true, this task will not output deck html file
:param enable_deck: If true, this task will output deck html file
:param deck_fields: If specified and enble_deck is True, this task will output deck html file with the fields specified in the tuple
:param docs: Documentation about this task
:param pod_template: Custom PodTemplate for this task.
:param pod_template_name: The name of the existing PodTemplate resource which will be used in this task.
Expand Down Expand Up @@ -339,6 +350,7 @@ def wrapper(fn: Callable[..., Any]) -> PythonFunctionTask[T]:
task_resolver=task_resolver,
disable_deck=disable_deck,
enable_deck=enable_deck,
deck_fields=deck_fields,
docs=docs,
pod_template=pod_template,
pod_template_name=pod_template_name,
Expand Down
2 changes: 1 addition & 1 deletion flytekit/deck/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@
SourceCodeRenderer
"""

from .deck import Deck
from .deck import Deck, DeckField
from .renderer import MarkdownRenderer, SourceCodeRenderer, TopFrameRenderer
71 changes: 48 additions & 23 deletions flytekit/deck/deck.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import enum
import os
import typing
from typing import Optional
Expand All @@ -10,6 +11,18 @@
DECK_FILE_NAME = "deck.html"


class DeckField(str, enum.Enum):
"""
DeckField is used to specify the fields that will be rendered in the deck.
"""

INPUT = "Input"
OUTPUT = "Output"
SOURCE_CODE = "Source Code"
TIMELINE = "Timeline"
thomasjpfan marked this conversation as resolved.
Show resolved Hide resolved
DEPENDENCIES = "Dependencies"


class Deck:
"""
Deck enable users to get customizable and default visibility into their tasks.
Expand Down Expand Up @@ -52,10 +65,11 @@
return iris_df
"""

def __init__(self, name: str, html: Optional[str] = ""):
def __init__(self, name: str, html: Optional[str] = "", auto_add_to_deck: bool = True):
self._name = name
self._html = html
FlyteContextManager.current_context().user_space_params.decks.append(self)
if auto_add_to_deck:
FlyteContextManager.current_context().user_space_params.decks.append(self)

Check warning on line 72 in flytekit/deck/deck.py

View check run for this annotation

Codecov / codecov/patch

flytekit/deck/deck.py#L72

Added line #L72 was not covered by tests

def append(self, html: str) -> "Deck":
assert isinstance(html, str)
Expand All @@ -79,8 +93,8 @@
Instead, the complete data set is used to create a comprehensive visualization of the execution time of each part of the task.
"""

def __init__(self, name: str, html: Optional[str] = ""):
super().__init__(name, html)
def __init__(self, name: str, html: Optional[str] = "", auto_add_to_deck: bool = True):
super().__init__(name, html, auto_add_to_deck)

Check warning on line 97 in flytekit/deck/deck.py

View check run for this annotation

Codecov / codecov/patch

flytekit/deck/deck.py#L97

Added line #L97 was not covered by tests
self.time_info = []

def append_time_info(self, info: dict):
Expand All @@ -89,36 +103,46 @@

@property
def html(self) -> str:
try:
from flytekitplugins.deck.renderer import GanttChartRenderer, TableRenderer
except ImportError:
warning_info = "Plugin 'flytekit-deck-standard' is not installed. To display time line, install the plugin in the image."
logger.warning(warning_info)
return warning_info

if len(self.time_info) == 0:
return ""

import pandas

df = pandas.DataFrame(self.time_info)
note = """
<p><strong>Note:</strong></p>
<ol>
<li>if the time duration is too small(< 1ms), it may be difficult to see on the time line graph.</li>
<li>For accurate execution time measurements, users should refer to wall time and process time.</li>
</ol>
"""
# set the accuracy to microsecond
df["ProcessTime"] = df["ProcessTime"].apply(lambda time: "{:.6f}".format(time))
df["WallTime"] = df["WallTime"].apply(lambda time: "{:.6f}".format(time))

gantt_chart_html = GanttChartRenderer().to_html(df)
time_table_html = TableRenderer().to_html(
df[["Name", "WallTime", "ProcessTime"]],
header_labels=["Name", "Wall Time(s)", "Process Time(s)"],
)
return gantt_chart_html + time_table_html + note
return generate_time_table(self.time_info) + note

Check warning on line 117 in flytekit/deck/deck.py

View check run for this annotation

Codecov / codecov/patch

flytekit/deck/deck.py#L117

Added line #L117 was not covered by tests


def generate_time_table(data: dict) -> str:
html = [

Check warning on line 121 in flytekit/deck/deck.py

View check run for this annotation

Codecov / codecov/patch

flytekit/deck/deck.py#L121

Added line #L121 was not covered by tests
'<table border="1" cellpadding="5" cellspacing="0" style="border-collapse: collapse; width: 100%;">',
"""
<thead>
<tr>
<th>Name</th>
<th>Wall Time(s)</th>
<th>Process Time(s)</th>
</tr>
</thead>
""",
"<tbody>",
]

# Add table rows
for row in data:
html.append("<tr>")
html.append(f"<td>{row['Name']}</td>")
html.append(f"<td>{row['WallTime']:.6f}</td>")
html.append(f"<td>{row['ProcessTime']:.6f}</td>")
html.append("</tr>")
html.append("</tbody>")

Check warning on line 142 in flytekit/deck/deck.py

View check run for this annotation

Codecov / codecov/patch

flytekit/deck/deck.py#L137-L142

Added lines #L137 - L142 were not covered by tests

html.append("</table>")
return "".join(html)

Check warning on line 145 in flytekit/deck/deck.py

View check run for this annotation

Codecov / codecov/patch

flytekit/deck/deck.py#L144-L145

Added lines #L144 - L145 were not covered by tests


def _get_deck(
Expand All @@ -129,6 +153,7 @@
If ignore_jupyter is set to True, then it will return a str even in a jupyter environment.
"""
deck_map = {deck.name: deck.html for deck in new_user_params.decks}

raw_html = get_deck_template().render(metadata=deck_map)
if not ignore_jupyter and ipython_check():
try:
Expand Down
Loading
Loading