Skip to content

Commit

Permalink
Core/enable deck (flyteorg#2314)
Browse files Browse the repository at this point in the history
* add additional_decks support

Signed-off-by: novahow <[email protected]>

	modified:   flytekit/core/base_task.py
	modified:   flytekit/core/python_function_task.py
	modified:   flytekit/core/task.py
	modified:   flytekit/deck/deck.py
	modified:   tests/flytekit/unit/core/test_flyte_file.py

* add tests and remove confusing fields

Signed-off-by: novahow <[email protected]>

	modified:   flytekit/core/base_task.py
	modified:   flytekit/deck/deck.py
	modified:   tests/flytekit/unit/deck/test_deck.py

* add deckselector

Signed-off-by: novahow <[email protected]>

	modified:   flytekit/core/base_task.py
	modified:   flytekit/core/context_manager.py
	modified:   flytekit/core/task.py
	modified:   flytekit/deck/deck.py
	modified:   tests/flytekit/unit/deck/test_deck.py

* make deck_selector to tuple

Signed-off-by: novahow <[email protected]>

* fix remote deck bug

Signed-off-by: novahow <[email protected]>

* fix timelinedeck and remove rendered_deck param

Signed-off-by: novahow <[email protected]>

* fix UI

Signed-off-by: novahow <[email protected]>

* fix timelinedeck test multiple time_info

Signed-off-by: novahow <[email protected]>

* nit

Signed-off-by: novahow <[email protected]>

* nit with enum

Signed-off-by: novahow <[email protected]>

* nit deck_fields

Signed-off-by: novahow <[email protected]>

* enable all decks, remove plotly dep

Signed-off-by: novahow <[email protected]>

* kevin's update

Signed-off-by: Kevin Su <[email protected]>

* nit

Signed-off-by: Kevin Su <[email protected]>

* remove chart

Signed-off-by: Kevin Su <[email protected]>

---------

Signed-off-by: novahow <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Co-authored-by: Kevin Su <[email protected]>
Signed-off-by: bugra.gedik <[email protected]>
  • Loading branch information
2 people authored and bugra.gedik committed Jul 3, 2024
1 parent 480d520 commit cbaed0b
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 56 deletions.
60 changes: 47 additions & 13 deletions flytekit/core/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
from flytekit.core.tracker import TrackedInstance
from flytekit.core.type_engine import TypeEngine, TypeTransformerFailedError
from flytekit.core.utils import timeit
from flytekit.deck import DeckField
from flytekit.loggers import logger
from flytekit.models import dynamic_job as _dynamic_job
from flytekit.models import interface as _interface_models
Expand Down Expand Up @@ -464,6 +465,13 @@ def __init__(
environment: Optional[Dict[str, str]] = None,
disable_deck: Optional[bool] = None,
enable_deck: Optional[bool] = None,
deck_fields: Optional[Tuple[DeckField, ...]] = (
DeckField.SOURCE_CODE,
DeckField.DEPENDENCIES,
DeckField.TIMELINE,
DeckField.INPUT,
DeckField.OUTPUT,
),
**kwargs,
):
"""
Expand All @@ -479,6 +487,8 @@ def __init__(
execution of the task. Supplied as a dictionary of key/value pairs
disable_deck (bool): (deprecated) If true, this task will not output deck html file
enable_deck (bool): If true, this task will output deck html file
deck_fields (Tuple[DeckField]): Tuple of decks to be
generated for this task. Valid values can be selected from fields of ``flytekit.deck.DeckField`` enum
"""
super().__init__(
task_type=task_type,
Expand All @@ -490,22 +500,35 @@ def __init__(
self._environment = environment if environment else {}
self._task_config = task_config

# first we resolve the conflict between params regarding decks, if any two of [disable_deck, enable_deck]
# are set, we raise an error
configured_deck_params = [disable_deck is not None, enable_deck is not None]
if sum(configured_deck_params) > 1:
raise ValueError("only one of [disable_deck, enable_deck] can be set")

if disable_deck is not None:
warnings.warn(
"disable_deck was deprecated in 1.10.0, please use enable_deck instead",
FutureWarning,
)

# Confirm that disable_deck and enable_deck do not contradict each other
if disable_deck is not None and enable_deck is not None:
raise ValueError("disable_deck and enable_deck cannot both be set at the same time")

if enable_deck is not None:
self._disable_deck = not enable_deck
elif disable_deck is not None:
self._disable_deck = disable_deck
else:
self._disable_deck = True

self._deck_fields = list(deck_fields) if (deck_fields is not None and self.disable_deck is False) else []

deck_members = set([_field for _field in DeckField])
# enumerate additional decks, check if any of them are invalid
for deck in self._deck_fields:
if deck not in deck_members:
raise ValueError(
f"Element {deck} from deck_fields param is not a valid deck field. Please use one of {deck_members}"
)

if self._python_interface.docstring:
if self.docs is None:
self._docs = Documentation(
Expand Down Expand Up @@ -645,18 +668,20 @@ def _output_to_literal_map(self, native_outputs: Dict[int, Any], ctx: FlyteConte

def _write_decks(self, native_inputs, native_outputs_as_map, ctx, new_user_params):
if self._disable_deck is False:
from flytekit.deck.deck import Deck, _output_deck
from flytekit.deck.deck import Deck, DeckField, _output_deck

INPUT = "Inputs"
OUTPUT = "Outputs"
INPUT = DeckField.INPUT
OUTPUT = DeckField.OUTPUT

input_deck = Deck(INPUT)
for k, v in native_inputs.items():
input_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_input_var(k, v)))
if DeckField.INPUT in self.deck_fields:
input_deck = Deck(INPUT.value)
for k, v in native_inputs.items():
input_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_input_var(k, v)))

output_deck = Deck(OUTPUT)
for k, v in native_outputs_as_map.items():
output_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_output_var(k, v)))
if DeckField.OUTPUT in self.deck_fields:
output_deck = Deck(OUTPUT.value)
for k, v in native_outputs_as_map.items():
output_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_output_var(k, v)))

if ctx.execution_state and ctx.execution_state.is_local_execution():
# When we run the workflow remotely, flytekit outputs decks at the end of _dispatch_execute
Expand All @@ -681,6 +706,8 @@ def dispatch_execute(
may be none
* ``DynamicJobSpec`` is returned when a dynamic workflow is executed
"""
if DeckField.TIMELINE.value in self.deck_fields and ctx.user_space_params is not None:
ctx.user_space_params.decks.append(ctx.user_space_params.timeline_deck)
# Invoked before the task is executed
new_user_params = self.pre_execute(ctx.user_space_params)

Expand Down Expand Up @@ -791,6 +818,13 @@ def disable_deck(self) -> bool:
"""
return self._disable_deck

@property
def deck_fields(self) -> List[DeckField]:
"""
If not empty, this task will output deck html file for the specified decks
"""
return self._deck_fields


class TaskResolverMixin(object):
"""
Expand Down
12 changes: 10 additions & 2 deletions flytekit/core/context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class Builder(object):
execution_date: typing.Optional[datetime] = None
logging: Optional[_logging.Logger] = None
task_id: typing.Optional[_identifier.Identifier] = None
output_metadata_prefix: Optional[str] = None

def __init__(self, current: typing.Optional[ExecutionParameters] = None):
self.stats = current.stats if current else None
Expand All @@ -100,6 +101,7 @@ def __init__(self, current: typing.Optional[ExecutionParameters] = None):
self.attrs = current._attrs if current else {}
self.raw_output_prefix = current.raw_output_prefix if current else None
self.task_id = current.task_id if current else None
self.output_metadata_prefix = current.output_metadata_prefix if current else None

def add_attr(self, key: str, v: typing.Any) -> ExecutionParameters.Builder:
self.attrs[key] = v
Expand All @@ -118,6 +120,7 @@ def build(self) -> ExecutionParameters:
decks=self.decks,
raw_output_prefix=self.raw_output_prefix,
task_id=self.task_id,
output_metadata_prefix=self.output_metadata_prefix,
**self.attrs,
)

Expand Down Expand Up @@ -181,6 +184,7 @@ def __init__(
self._checkpoint = checkpoint
self._decks = decks
self._task_id = task_id
self._timeline_deck = None

@property
def stats(self) -> taggable.TaggableStats:
Expand Down Expand Up @@ -273,16 +277,20 @@ def default_deck(self) -> Deck:

@property
def timeline_deck(self) -> "TimeLineDeck": # type: ignore
from flytekit.deck.deck import TimeLineDeck
from flytekit.deck.deck import DeckField, TimeLineDeck

time_line_deck = None
for deck in self.decks:
if isinstance(deck, TimeLineDeck):
time_line_deck = deck
break
if time_line_deck is None:
time_line_deck = TimeLineDeck("Timeline")
if self._timeline_deck is not None:
time_line_deck = self._timeline_deck
else:
time_line_deck = TimeLineDeck(DeckField.TIMELINE.value, auto_add_to_deck=False)

self._timeline_deck = time_line_deck
return time_line_deck

def __getattr__(self, attr_name: str) -> typing.Any:
Expand Down
16 changes: 9 additions & 7 deletions flytekit/core/python_function_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,20 +352,22 @@ def dynamic_execute(self, task_function: Callable, **kwargs) -> Any:

def _write_decks(self, native_inputs, native_outputs_as_map, ctx, new_user_params):
if self._disable_deck is False:
from flytekit.deck import Deck
from flytekit.deck import Deck, DeckField
from flytekit.deck.renderer import PythonDependencyRenderer

# These errors are raised if the source code can not be retrieved
with suppress(OSError, TypeError):
source_code = inspect.getsource(self._task_function)
from flytekit.deck.renderer import SourceCodeRenderer

source_code_deck = Deck("Source Code")
renderer = SourceCodeRenderer()
source_code_deck.append(renderer.to_html(source_code))
if DeckField.SOURCE_CODE in self.deck_fields:
source_code_deck = Deck(DeckField.SOURCE_CODE.value)
renderer = SourceCodeRenderer()
source_code_deck.append(renderer.to_html(source_code))

python_dependencies_deck = Deck("Dependencies")
renderer = PythonDependencyRenderer()
python_dependencies_deck.append(renderer.to_html())
if DeckField.DEPENDENCIES in self.deck_fields:
python_dependencies_deck = Deck(DeckField.DEPENDENCIES.value)
renderer = PythonDependencyRenderer()
python_dependencies_deck.append(renderer.to_html())

return super()._write_decks(native_inputs, native_outputs_as_map, ctx, new_user_params)
12 changes: 12 additions & 0 deletions flytekit/core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from flytekit.core.python_function_task import PythonFunctionTask
from flytekit.core.reference_entity import ReferenceEntity, TaskReference
from flytekit.core.resources import Resources
from flytekit.deck import DeckField
from flytekit.extras.accelerators import BaseAccelerator
from flytekit.image_spec.image_spec import ImageSpec
from flytekit.models.documentation import Documentation
Expand Down Expand Up @@ -114,6 +115,7 @@ def task(
docs: Optional[Documentation] = ...,
disable_deck: Optional[bool] = ...,
enable_deck: Optional[bool] = ...,
deck_fields: Optional[Tuple[DeckField, ...]] = ...,
pod_template: Optional["PodTemplate"] = ...,
pod_template_name: Optional[str] = ...,
accelerator: Optional[BaseAccelerator] = ...,
Expand Down Expand Up @@ -151,6 +153,7 @@ def task(
docs: Optional[Documentation] = ...,
disable_deck: Optional[bool] = ...,
enable_deck: Optional[bool] = ...,
deck_fields: Optional[Tuple[DeckField, ...]] = ...,
pod_template: Optional["PodTemplate"] = ...,
pod_template_name: Optional[str] = ...,
accelerator: Optional[BaseAccelerator] = ...,
Expand Down Expand Up @@ -187,6 +190,13 @@ def task(
docs: Optional[Documentation] = None,
disable_deck: Optional[bool] = None,
enable_deck: Optional[bool] = None,
deck_fields: Optional[Tuple[DeckField, ...]] = (
DeckField.SOURCE_CODE,
DeckField.DEPENDENCIES,
DeckField.TIMELINE,
DeckField.INPUT,
DeckField.OUTPUT,
),
pod_template: Optional["PodTemplate"] = None,
pod_template_name: Optional[str] = None,
accelerator: Optional[BaseAccelerator] = None,
Expand Down Expand Up @@ -307,6 +317,7 @@ def launch_dynamically():
:param task_resolver: Provide a custom task resolver.
:param disable_deck: (deprecated) If true, this task will not output deck html file
:param enable_deck: If true, this task will output deck html file
:param deck_fields: If specified and enble_deck is True, this task will output deck html file with the fields specified in the tuple
:param docs: Documentation about this task
:param pod_template: Custom PodTemplate for this task.
:param pod_template_name: The name of the existing PodTemplate resource which will be used in this task.
Expand Down Expand Up @@ -339,6 +350,7 @@ def wrapper(fn: Callable[..., Any]) -> PythonFunctionTask[T]:
task_resolver=task_resolver,
disable_deck=disable_deck,
enable_deck=enable_deck,
deck_fields=deck_fields,
docs=docs,
pod_template=pod_template,
pod_template_name=pod_template_name,
Expand Down
2 changes: 1 addition & 1 deletion flytekit/deck/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@
SourceCodeRenderer
"""

from .deck import Deck
from .deck import Deck, DeckField
from .renderer import MarkdownRenderer, SourceCodeRenderer, TopFrameRenderer
71 changes: 48 additions & 23 deletions flytekit/deck/deck.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import enum
import os
import typing
from typing import Optional
Expand All @@ -10,6 +11,18 @@
DECK_FILE_NAME = "deck.html"


class DeckField(str, enum.Enum):
"""
DeckField is used to specify the fields that will be rendered in the deck.
"""

INPUT = "Input"
OUTPUT = "Output"
SOURCE_CODE = "Source Code"
TIMELINE = "Timeline"
DEPENDENCIES = "Dependencies"


class Deck:
"""
Deck enable users to get customizable and default visibility into their tasks.
Expand Down Expand Up @@ -52,10 +65,11 @@ def t2() -> Annotated[pd.DataFrame, TopFrameRenderer(10)]:
return iris_df
"""

def __init__(self, name: str, html: Optional[str] = ""):
def __init__(self, name: str, html: Optional[str] = "", auto_add_to_deck: bool = True):
self._name = name
self._html = html
FlyteContextManager.current_context().user_space_params.decks.append(self)
if auto_add_to_deck:
FlyteContextManager.current_context().user_space_params.decks.append(self)

def append(self, html: str) -> "Deck":
assert isinstance(html, str)
Expand All @@ -79,8 +93,8 @@ class TimeLineDeck(Deck):
Instead, the complete data set is used to create a comprehensive visualization of the execution time of each part of the task.
"""

def __init__(self, name: str, html: Optional[str] = ""):
super().__init__(name, html)
def __init__(self, name: str, html: Optional[str] = "", auto_add_to_deck: bool = True):
super().__init__(name, html, auto_add_to_deck)
self.time_info = []

def append_time_info(self, info: dict):
Expand All @@ -89,36 +103,46 @@ def append_time_info(self, info: dict):

@property
def html(self) -> str:
try:
from flytekitplugins.deck.renderer import GanttChartRenderer, TableRenderer
except ImportError:
warning_info = "Plugin 'flytekit-deck-standard' is not installed. To display time line, install the plugin in the image."
logger.warning(warning_info)
return warning_info

if len(self.time_info) == 0:
return ""

import pandas

df = pandas.DataFrame(self.time_info)
note = """
<p><strong>Note:</strong></p>
<ol>
<li>if the time duration is too small(< 1ms), it may be difficult to see on the time line graph.</li>
<li>For accurate execution time measurements, users should refer to wall time and process time.</li>
</ol>
"""
# set the accuracy to microsecond
df["ProcessTime"] = df["ProcessTime"].apply(lambda time: "{:.6f}".format(time))
df["WallTime"] = df["WallTime"].apply(lambda time: "{:.6f}".format(time))

gantt_chart_html = GanttChartRenderer().to_html(df)
time_table_html = TableRenderer().to_html(
df[["Name", "WallTime", "ProcessTime"]],
header_labels=["Name", "Wall Time(s)", "Process Time(s)"],
)
return gantt_chart_html + time_table_html + note
return generate_time_table(self.time_info) + note


def generate_time_table(data: dict) -> str:
html = [
'<table border="1" cellpadding="5" cellspacing="0" style="border-collapse: collapse; width: 100%;">',
"""
<thead>
<tr>
<th>Name</th>
<th>Wall Time(s)</th>
<th>Process Time(s)</th>
</tr>
</thead>
""",
"<tbody>",
]

# Add table rows
for row in data:
html.append("<tr>")
html.append(f"<td>{row['Name']}</td>")
html.append(f"<td>{row['WallTime']:.6f}</td>")
html.append(f"<td>{row['ProcessTime']:.6f}</td>")
html.append("</tr>")
html.append("</tbody>")

html.append("</table>")
return "".join(html)


def _get_deck(
Expand All @@ -129,6 +153,7 @@ def _get_deck(
If ignore_jupyter is set to True, then it will return a str even in a jupyter environment.
"""
deck_map = {deck.name: deck.html for deck in new_user_params.decks}

raw_html = get_deck_template().render(metadata=deck_map)
if not ignore_jupyter and ipython_check():
try:
Expand Down
Loading

0 comments on commit cbaed0b

Please sign in to comment.