Skip to content

Commit

Permalink
fix timelinedeck and remove rendered_deck param
Browse files Browse the repository at this point in the history
Signed-off-by: novahow <[email protected]>
  • Loading branch information
novahow committed Apr 8, 2024
1 parent d504753 commit 09515f6
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 63 deletions.
2 changes: 1 addition & 1 deletion flytekit/bin/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def _dispatch_execute(
logger.info(f"Engine folder written successfully to the output prefix {output_prefix}")

if not getattr(task_def, "disable_deck", True):
_output_deck(task_def.name.split(".")[-1], ctx.user_space_params.with_rendered_decks(task_def.decks).build())
_output_deck(task_def.name.split(".")[-1], ctx.user_space_params)

logger.debug("Finished _dispatch_execute")

Expand Down
21 changes: 9 additions & 12 deletions flytekit/core/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -664,15 +664,13 @@ def _write_decks(self, native_inputs, native_outputs_as_map, ctx, new_user_param
INPUT = DeckFields.INPUT
OUTPUT = DeckFields.OUTPUT

if DeckFields.INPUT in self.decks:
input_deck = Deck(INPUT.value)
for k, v in native_inputs.items():
input_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_input_var(k, v)))
input_deck = Deck(INPUT.value, auto_add_to_deck=DeckFields.INPUT in self.decks)
for k, v in native_inputs.items():
input_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_input_var(k, v)))

if DeckFields.OUTPUT in self.decks:
output_deck = Deck(OUTPUT.value)
for k, v in native_outputs_as_map.items():
output_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_output_var(k, v)))
output_deck = Deck(OUTPUT.value, auto_add_to_deck=DeckFields.OUTPUT in self.decks)
for k, v in native_outputs_as_map.items():
output_deck.append(TypeEngine.to_html(ctx, v, self.get_type_for_output_var(k, v)))

if ctx.execution_state and ctx.execution_state.is_local_execution():
# When we run the workflow remotely, flytekit outputs decks at the end of _dispatch_execute
Expand All @@ -697,6 +695,8 @@ def dispatch_execute(
may be none
* ``DynamicJobSpec`` is returned when a dynamic workflow is executed
"""
if DeckFields.TIMELINE.value in self.decks and ctx.user_space_params is not None:
ctx.user_space_params.decks.append(ctx.user_space_params.timeline_deck)
# Invoked before the task is executed
new_user_params = self.pre_execute(ctx.user_space_params)

Expand Down Expand Up @@ -773,10 +773,7 @@ def pre_execute(self, user_params: Optional[ExecutionParameters]) -> Optional[Ex
This should return either the same context of the mutated context
"""
if user_params is None:
return user_params
new_param = user_params.with_rendered_decks(self.decks).build()
return new_param
return user_params

@abstractmethod
def execute(self, **kwargs) -> Any:
Expand Down
21 changes: 6 additions & 15 deletions flytekit/core/context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ class Builder(object):
execution_date: typing.Optional[datetime] = None
logging: Optional[_logging.Logger] = None
task_id: typing.Optional[_identifier.Identifier] = None
rendered_decks: Optional[List[str]] = None
output_metadata_prefix: Optional[str] = None

def __init__(self, current: typing.Optional[ExecutionParameters] = None):
Expand All @@ -103,7 +102,6 @@ def __init__(self, current: typing.Optional[ExecutionParameters] = None):
self.attrs = current._attrs if current else {}
self.raw_output_prefix = current.raw_output_prefix if current else None
self.task_id = current.task_id if current else None
self.rendered_decks = current.rendered_decks if current else []
self.output_metadata_prefix = current.output_metadata_prefix if current else None

def add_attr(self, key: str, v: typing.Any) -> ExecutionParameters.Builder:
Expand All @@ -123,7 +121,6 @@ def build(self) -> ExecutionParameters:
decks=self.decks,
raw_output_prefix=self.raw_output_prefix,
task_id=self.task_id,
rendered_decks=self.rendered_decks,
output_metadata_prefix=self.output_metadata_prefix,
**self.attrs,
)
Expand All @@ -146,11 +143,6 @@ def with_task_sandbox(self) -> Builder:
b.working_dir = task_sandbox_dir
return b

def with_rendered_decks(self, rendered_decks: List[str]) -> Builder:
b = self.new_builder(self)
b.rendered_decks = rendered_decks
return b

def builder(self) -> Builder:
return ExecutionParameters.Builder(current=self)

Expand All @@ -166,7 +158,6 @@ def __init__(
checkpoint=None,
decks=None,
task_id: typing.Optional[_identifier.Identifier] = None,
rendered_decks=None,
**kwargs,
):
"""
Expand Down Expand Up @@ -194,7 +185,7 @@ def __init__(
self._checkpoint = checkpoint
self._decks = decks
self._task_id = task_id
self._rendered_decks = [] if rendered_decks is None else rendered_decks
self._timeline_deck = None

@property
def stats(self) -> taggable.TaggableStats:
Expand Down Expand Up @@ -295,14 +286,14 @@ def timeline_deck(self) -> "TimeLineDeck": # type: ignore
time_line_deck = deck
break
if time_line_deck is None:
time_line_deck = TimeLineDeck(DeckFields.TIMELINE.value)
if self._timeline_deck is not None:
time_line_deck = self._timeline_deck
else:
time_line_deck = TimeLineDeck(DeckFields.TIMELINE.value)

self._timeline_deck = time_line_deck
return time_line_deck

@property
def rendered_decks(self) -> List[str]:
return self._rendered_decks

def __getattr__(self, attr_name: str) -> typing.Any:
"""
This houses certain task specific context. For example in Spark, it houses the SparkSession, etc
Expand Down
8 changes: 6 additions & 2 deletions flytekit/core/python_function_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,11 +359,15 @@ def _write_decks(self, native_inputs, native_outputs_as_map, ctx, new_user_param
source_code = inspect.getsource(self._task_function)
from flytekit.deck.renderer import SourceCodeRenderer

source_code_deck = Deck(DeckFields.SOURCE_CODE.value)
source_code_deck = Deck(
DeckFields.SOURCE_CODE.value, auto_add_to_deck=DeckFields.SOURCE_CODE in self.decks
)
renderer = SourceCodeRenderer()
source_code_deck.append(renderer.to_html(source_code))

python_dependencies_deck = Deck(DeckFields.DEPENDENCIES.value)
python_dependencies_deck = Deck(
DeckFields.DEPENDENCIES.value, auto_add_to_deck=DeckFields.DEPENDENCIES in self.decks
)
renderer = PythonDependencyRenderer()
python_dependencies_deck.append(renderer.to_html())

Expand Down
17 changes: 6 additions & 11 deletions flytekit/deck/deck.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,11 @@ def t2() -> Annotated[pd.DataFrame, TopFrameRenderer(10)]:
return iris_df
"""

def __init__(self, name: str, html: Optional[str] = ""):
def __init__(self, name: str, html: Optional[str] = "", auto_add_to_deck: bool = True):
self._name = name
self._html = html
FlyteContextManager.current_context().user_space_params.decks.append(self)
if auto_add_to_deck:
FlyteContextManager.current_context().user_space_params.decks.append(self)

def append(self, html: str) -> "Deck":
assert isinstance(html, str)
Expand All @@ -92,8 +93,8 @@ class TimeLineDeck(Deck):
Instead, the complete data set is used to create a comprehensive visualization of the execution time of each part of the task.
"""

def __init__(self, name: str, html: Optional[str] = ""):
super().__init__(name, html)
def __init__(self, name: str, html: Optional[str] = "", auto_add_to_deck: bool = False):
super().__init__(name, html, auto_add_to_deck)
self.time_info = []

def append_time_info(self, info: dict):
Expand Down Expand Up @@ -141,13 +142,7 @@ def _get_deck(
Get flyte deck html string
If ignore_jupyter is set to True, then it will return a str even in a jupyter environment.
"""
deck_members = set([_field.value for _field in DeckFields])
rendered_decks = new_user_params.rendered_decks
deck_map = {
deck.name: deck.html
for deck in new_user_params.decks
if deck.name in rendered_decks or deck.name not in deck_members
}
deck_map = {deck.name: deck.html for deck in new_user_params.decks}

raw_html = get_deck_template().render(metadata=deck_map)
if not ignore_jupyter and ipython_check():
Expand Down
10 changes: 5 additions & 5 deletions tests/flytekit/unit/core/test_flyte_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ def my_wf() -> FlyteFile:
def test_file_handling_remote_file_handling():
SAMPLE_DATA = "https://raw.githubusercontent.com/jbrownlee/Datasets/master/pima-indians-diabetes.data.csv"

@task(enable_deck=False)
@task
def t1() -> FlyteFile:
return SAMPLE_DATA

Expand Down Expand Up @@ -312,7 +312,7 @@ def my_wf() -> FlyteFile:
def test_file_handling_remote_file_handling_flyte_file():
SAMPLE_DATA = "https://raw.githubusercontent.com/jbrownlee/Datasets/master/pima-indians-diabetes.data.csv"

@task(enable_deck=False)
@task
def t1() -> FlyteFile:
# Unlike the test above, this returns the remote path wrapped in a FlyteFile object
return FlyteFile(SAMPLE_DATA)
Expand Down Expand Up @@ -606,23 +606,23 @@ def test_for_downloading():

@pytest.mark.sandbox_test
def test_file_open_things():
@task(enable_deck=False)
@task
def write_this_file_to_s3() -> FlyteFile:
ctx = FlyteContextManager.current_context()
r = ctx.file_access.get_random_string()
dest = ctx.file_access.join(ctx.file_access.raw_output_prefix, r)
ctx.file_access.put(__file__, dest)
return FlyteFile(path=dest)

@task(enable_deck=False)
@task
def copy_file(ff: FlyteFile) -> FlyteFile:
new_file = FlyteFile.new_remote_file(ff.remote_path)
with ff.open("r") as r:
with new_file.open("w") as w:
w.write(r.read())
return new_file

@task(enable_deck=False)
@task
def print_file(ff: FlyteFile):
with open(ff, "r") as fh:
print(len(fh.readlines()))
Expand Down
3 changes: 3 additions & 0 deletions tests/flytekit/unit/core/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@ def test_timeit():
ctx = FlyteContextManager.current_context()
ctx.user_space_params._decks = []

from flytekit.deck.deck import DeckFields

with timeit("Set disable_deck to False"):
kwargs = {}
kwargs["disable_deck"] = False
kwargs["decks"] = (DeckFields.TIMELINE.value,)

ctx = FlyteContextManager.current_context()
time_info_list = ctx.user_space_params.timeline_deck.time_info
Expand Down
32 changes: 15 additions & 17 deletions tests/flytekit/unit/deck/test_deck.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ def test_timeline_deck():
assert timeline_deck.name == "Timeline"
assert len(timeline_deck.time_info) == 1
assert timeline_deck.time_info[0] == time_info
assert len(ctx.user_space_params.decks) == 1
assert len(ctx.user_space_params.decks) == 0


@pytest.mark.parametrize(
"disable_deck,expected_decks",
[
(None, 1), # time line deck
(False, 3), # time line deck + source code deck + python dependency deck
(True, 1), # time line deck
(None, 0),
(False, 2), # source code deck + python dependency deck
(True, 0),
],
)
def test_deck_for_task(disable_deck, expected_decks):
Expand All @@ -74,14 +74,14 @@ def t1(a: int) -> str:
@pytest.mark.parametrize(
"decks,enable_deck,expected_decks",
[
((), True, 3), # time line deck + source code deck + python dependency deck
((DeckFields.INPUT.value), False, 1), # time line deck
((), True, 0),
((DeckFields.INPUT.value), False, 0),
(
(DeckFields.OUTPUT.value, DeckFields.INPUT.value, DeckFields.TIMELINE.value, DeckFields.DEPENDENCIES.value),
True,
5, # time line deck + source code deck + dependency + input and output decks
4, # time line deck + dependency + input and output decks
),
(None, None, 1), # time line deck + source code deck
(None, True, 2), # source code deck + python dependency deck
],
)
@mock.patch("flytekit.deck.deck._output_deck")
Expand All @@ -100,8 +100,6 @@ def t1(a: int) -> str:

t1(a=3)
assert len(ctx.user_space_params.decks) == expected_decks
if enable_deck is True:
assert len(_output_deck.call_args[0][1].rendered_decks) == len(decks)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -132,21 +130,21 @@ def t1(a: int) -> str:
@pytest.mark.parametrize(
"enable_deck,disable_deck, expected_decks, expect_error",
[
(None, None, 2, False), # default deck and time line deck
(None, None, 1, False), # default deck
(
None,
False,
4,
3,
False,
), # default deck and time line deck + source code deck + python dependency deck
(None, True, 2, False), # default deck and time line deck
), # default deck + source code deck + python dependency deck
(None, True, 1, False), # default deck
(
True,
None,
4,
3,
False,
), # default deck and time line deck + source code deck + python dependency deck
(False, None, 2, False), # default deck and time line deck
), # default deck + source code deck + python dependency deck
(False, None, 1, False), # default deck
(True, True, -1, True), # Set both disable_deck and enable_deck to True and confirm that it fails
(False, False, -1, True), # Set both disable_deck and enable_deck to False and confirm that it fails
],
Expand Down

0 comments on commit 09515f6

Please sign in to comment.