Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flyte deck] Streaming Decks #2779

Open
wants to merge 68 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 60 commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
01182b4
[Flyte Decl] Streaming Decks
Future-Outlier Oct 1, 2024
9616fc3
print
Future-Outlier Oct 1, 2024
98ae7c7
sleep more
Future-Outlier Oct 1, 2024
4e92bb0
add dummy deck
Future-Outlier Oct 2, 2024
4df18b5
nit
Future-Outlier Oct 2, 2024
ebb4d4e
dummy deck
Future-Outlier Oct 2, 2024
99522d9
update
Future-Outlier Oct 2, 2024
c19d67d
nit
Future-Outlier Oct 2, 2024
67cd829
test
Future-Outlier Oct 2, 2024
06da3df
return html
Future-Outlier Oct 2, 2024
6d99d69
Change Deck
Future-Outlier Oct 2, 2024
b805cd7
fix
Future-Outlier Oct 2, 2024
4c97758
fix recursion error
Future-Outlier Oct 2, 2024
7b3574a
remove redundant code
Future-Outlier Oct 2, 2024
9b60564
add dummy deck to deck init
Future-Outlier Oct 2, 2024
18c994f
Better Dummy Deck Logic
Future-Outlier Oct 2, 2024
39f39d1
Merge branch 'master' into flytekit-streaming-deck
Future-Outlier Oct 7, 2024
aabcbbb
Deck Publish
Future-Outlier Oct 7, 2024
9ca43f3
Merge branch 'master' into flytekit-streaming-deck
Future-Outlier Nov 25, 2024
ed56352
litn
Future-Outlier Nov 27, 2024
b559fc9
remove dummy deck
Future-Outlier Nov 27, 2024
fc5578f
nit
Future-Outlier Nov 27, 2024
7139468
use auto refresh tab, 5 seconds as interval
Future-Outlier Dec 2, 2024
e0aee9e
revert
Future-Outlier Dec 2, 2024
3727588
test setDynamicTabs
Future-Outlier Dec 2, 2024
ce3ee15
change interval time
Future-Outlier Dec 2, 2024
d066231
test
Future-Outlier Dec 2, 2024
f9387ce
revert
Future-Outlier Dec 2, 2024
f14c3fa
test
Future-Outlier Dec 2, 2024
c33a909
nit
Future-Outlier Dec 2, 2024
8666c60
try dynamic containers
Future-Outlier Dec 2, 2024
93580d6
try dynamic containers v2
Future-Outlier Dec 2, 2024
bcaaabd
try dynamic containers v3
Future-Outlier Dec 2, 2024
a321700
debug
Future-Outlier Dec 2, 2024
6464fae
update
Future-Outlier Dec 2, 2024
884943c
nit
Future-Outlier Dec 3, 2024
d4b5b96
Refresh Botton
Future-Outlier Dec 3, 2024
406227c
fix
Future-Outlier Dec 3, 2024
1e77f54
lint
Future-Outlier Dec 3, 2024
d70a2d5
Merge branch 'master' into flytekit-streaming-deck
Future-Outlier Dec 3, 2024
7fc6393
test new refresh
Future-Outlier Dec 3, 2024
6980140
lint
Future-Outlier Dec 3, 2024
c87a342
Revert back html code, collaborating with Lyon
Future-Outlier Dec 5, 2024
f609760
lint
Future-Outlier Dec 5, 2024
473ae11
nit
Future-Outlier Dec 5, 2024
008fe52
Merge branch 'master' into flytekit-streaming-deck
Future-Outlier Dec 12, 2024
f0b9028
nit
Future-Outlier Dec 16, 2024
d48efa9
update
Future-Outlier Dec 17, 2024
6b55930
better code
Future-Outlier Dec 17, 2024
b5912fb
update
Future-Outlier Dec 17, 2024
a681ccd
some notes for giving user params builder deck enabled
Future-Outlier Dec 18, 2024
048fdff
update
Future-Outlier Jan 2, 2025
7bcf15e
raise error when disabled deck and called Deck.publish()
Future-Outlier Jan 2, 2025
b6c41c3
lint
Future-Outlier Jan 2, 2025
be02f9f
lint
Future-Outlier Jan 2, 2025
cf83e06
update
Future-Outlier Jan 8, 2025
e137328
static method by YEE
Future-Outlier Jan 9, 2025
a59a56e
make Deck.publish more like a wrapper by moving enable deck checking …
Future-Outlier Jan 9, 2025
b8383be
lint
Future-Outlier Jan 9, 2025
dc6d203
print monodocs err
Future-Outlier Jan 9, 2025
41d8760
Fix monodocs
Future-Outlier Jan 9, 2025
b71cc19
use builder
Future-Outlier Jan 9, 2025
b5976fe
add translator test for deck serialization settings
Future-Outlier Jan 9, 2025
0c1a5a3
update
Future-Outlier Jan 13, 2025
d082456
fix
Future-Outlier Jan 13, 2025
4a8c68f
test
Future-Outlier Jan 13, 2025
b58527b
update
Future-Outlier Jan 13, 2025
2764ed4
remove blank
Future-Outlier Jan 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/monodocs_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,4 @@ jobs:
DOCSEARCH_API_KEY: fake_docsearch_api_key # must be set to get doc build to succeed
run: |
conda activate monodocs-env
make -C docs clean html SPHINXOPTS="-W -vvv"
make -C docs clean html SPHINXOPTS="-W"
2 changes: 1 addition & 1 deletion flytekit/bin/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ def _dispatch_execute(
logger.info(f"Engine folder written successfully to the output prefix {output_prefix}")

if task_def is not None and not getattr(task_def, "disable_deck", True):
_output_deck(task_def.name.split(".")[-1], ctx.user_space_params)
_output_deck(task_name=task_def.name.split(".")[-1], new_user_params=ctx.user_space_params)

logger.debug("Finished _dispatch_execute")

Expand Down
9 changes: 7 additions & 2 deletions flytekit/core/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ class TaskMetadata(object):
timeout (Optional[Union[datetime.timedelta, int]]): the max amount of time for which one execution of this task
should be executed for. The execution will be terminated if the runtime exceeds the given timeout
(approximately)
:param bool generates_deck: Whether the task will generate a Deck URI.
pod_template_name (Optional[str]): the name of existing PodTemplate resource in the cluster which will be used in this task.
"""

Expand All @@ -141,6 +142,7 @@ class TaskMetadata(object):
retries: int = 0
timeout: Optional[Union[datetime.timedelta, int]] = None
pod_template_name: Optional[str] = None
generates_deck: bool = False
is_eager: bool = False

def __post_init__(self):
Expand Down Expand Up @@ -179,6 +181,7 @@ def to_taskmetadata_model(self) -> _task_model.TaskMetadata:
discovery_version=self.cache_version,
deprecated_error_message=self.deprecated,
cache_serializable=self.cache_serialize,
generates_deck=self.generates_deck,
pod_template_name=self.pod_template_name,
cache_ignore_input_vars=self.cache_ignore_input_vars,
is_eager=self.is_eager,
Expand Down Expand Up @@ -720,8 +723,10 @@ def dispatch_execute(
may be none
* ``DynamicJobSpec`` is returned when a dynamic workflow is executed
"""
if DeckField.TIMELINE.value in self.deck_fields and ctx.user_space_params is not None:
ctx.user_space_params.decks.append(ctx.user_space_params.timeline_deck)
if not self.disable_deck and ctx.user_space_params is not None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
if not self.disable_deck and ctx.user_space_params is not None:
if self.enable_deck and ctx.user_space_params is not None:

ctx.user_space_params._enable_deck = True # type: ignore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the ExecutionParams object has a builder, maybe we should use it here instead of accessing a private field?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok I'll move it to the builder, thank you

if DeckField.TIMELINE.value in self.deck_fields:
ctx.user_space_params.decks.append(ctx.user_space_params.timeline_deck)
# Invoked before the task is executed
new_user_params = self.pre_execute(ctx.user_space_params)

Expand Down
8 changes: 8 additions & 0 deletions flytekit/core/context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class Builder(object):
logging: Optional[_logging.Logger] = None
task_id: typing.Optional[_identifier.Identifier] = None
output_metadata_prefix: Optional[str] = None
enable_deck: bool = False

def __init__(self, current: typing.Optional[ExecutionParameters] = None):
self.stats = current.stats if current else None
Expand Down Expand Up @@ -125,6 +126,7 @@ def build(self) -> ExecutionParameters:
raw_output_prefix=self.raw_output_prefix,
task_id=self.task_id,
output_metadata_prefix=self.output_metadata_prefix,
enable_deck=self.enable_deck,
**self.attrs,
)

Expand Down Expand Up @@ -161,6 +163,7 @@ def __init__(
checkpoint=None,
decks=None,
task_id: typing.Optional[_identifier.Identifier] = None,
enable_deck: bool = False,
**kwargs,
):
"""
Expand Down Expand Up @@ -188,8 +191,13 @@ def __init__(
self._checkpoint = checkpoint
self._decks = decks
self._task_id = task_id
self._enable_deck = enable_deck
self._timeline_deck = None

@property
def enable_deck(self) -> bool:
return self._enable_deck

@property
def stats(self) -> taggable.TaggableStats:
"""
Expand Down
22 changes: 16 additions & 6 deletions flytekit/deck/deck.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ class Deck:
scatter plots or Markdown text. In addition, users can create new decks to render
their data with custom renderers.

.. warning::

This feature is in beta.

.. code-block:: python

iris_df = px.data.iris()
Expand Down Expand Up @@ -86,6 +82,12 @@ def name(self) -> str:
def html(self) -> str:
return self._html

@staticmethod
def publish():
params = FlyteContextManager.current_context().user_space_params
task_name = params.task_id.name
_output_deck(task_name=task_name, new_user_params=params)
pingsutw marked this conversation as resolved.
Show resolved Hide resolved


class TimeLineDeck(Deck):
"""
Expand Down Expand Up @@ -148,7 +150,8 @@ def generate_time_table(data: dict) -> str:


def _get_deck(
new_user_params: ExecutionParameters, ignore_jupyter: bool = False
new_user_params: ExecutionParameters,
ignore_jupyter: bool = False,
) -> typing.Union[str, "IPython.core.display.HTML"]: # type:ignore
"""
Get flyte deck html string
Expand Down Expand Up @@ -176,11 +179,17 @@ def _get_deck(

def _output_deck(task_name: str, new_user_params: ExecutionParameters):
ctx = FlyteContext.current_context()
params = ctx.user_space_params

if not params.enable_deck:
logger.warning("Deck is disabled for this task, please don't call Deck.publish()")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding None check for params

Consider checking params for None before accessing enable_deck attribute to avoid potential NoneType attribute errors.

Code suggestion
Check the AI-generated fix before applying
Suggested change
if not params.enable_deck:
logger.warning("Deck is disabled for this task, please don't call Deck.publish()")
if params is None or not params.enable_deck:
logger.warning("Deck is disabled for this task or context params are not available, please don't call Deck.publish()")
return

Code Review Run #6220fd


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

return

local_dir = ctx.file_access.get_random_local_directory()
local_path = f"{local_dir}{os.sep}{DECK_FILE_NAME}"
try:
with open(local_path, "w", encoding="utf-8") as f:
f.write(_get_deck(new_user_params, ignore_jupyter=True))
f.write(_get_deck(new_user_params=new_user_params, ignore_jupyter=True))
logger.info(f"{task_name} task creates flyte deck html to file://{local_path}")
if ctx.execution_state.mode == ExecutionState.Mode.TASK_EXECUTION:
fs = ctx.file_access.get_filesystem_for_path(new_user_params.output_metadata_prefix)
Expand All @@ -197,6 +206,7 @@ def _output_deck(task_name: str, new_user_params: ExecutionParameters):
def get_deck_template() -> Template:
root = os.path.dirname(os.path.abspath(__file__))
templates_dir = os.path.join(root, "html", "template.html")

with open(templates_dir, "r") as f:
template_content = f.read()
return Template(template_content)
13 changes: 13 additions & 0 deletions flytekit/models/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ def __init__(
pod_template_name,
cache_ignore_input_vars,
is_eager: bool = False,
generates_deck: bool = False,
):
"""
Information needed at runtime to determine behavior such as whether or not outputs are discoverable, timeouts,
Expand All @@ -199,6 +200,7 @@ def __init__(
receive deprecation warnings.
:param bool cache_serializable: Whether or not caching operations are executed in serial. This means only a
single instance over identical inputs is executed, other concurrent executions wait for the cached results.
:param bool generates_deck: Whether the task will generate a Deck URI.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
:param bool generates_deck: Whether the task will generate a Deck URI.
:param bool generates_deck: Whether the task will generate a Deck.

:param pod_template_name: The name of the existing PodTemplate resource which will be used in this task.
:param cache_ignore_input_vars: Input variables that should not be included when calculating hash for cache.
:param is_eager:
Expand All @@ -214,6 +216,7 @@ def __init__(
self._pod_template_name = pod_template_name
self._cache_ignore_input_vars = cache_ignore_input_vars
self._is_eager = is_eager
self._generates_deck = generates_deck

@property
def is_eager(self):
Expand Down Expand Up @@ -295,6 +298,14 @@ def pod_template_name(self):
"""
return self._pod_template_name

@property
def generates_deck(self) -> bool:
"""
Whether the task will generate a Deck URI.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Whether the task will generate a Deck URI.
Whether the task will generate a Deck.

:rtype: bool
"""
return self._generates_deck

@property
def cache_ignore_input_vars(self):
"""
Expand All @@ -315,6 +326,7 @@ def to_flyte_idl(self):
discovery_version=self.discovery_version,
deprecated_error_message=self.deprecated_error_message,
cache_serializable=self.cache_serializable,
generates_deck=self.generates_deck,
pod_template_name=self.pod_template_name,
cache_ignore_input_vars=self.cache_ignore_input_vars,
is_eager=self.is_eager,
Expand All @@ -338,6 +350,7 @@ def from_flyte_idl(cls, pb2_object: _core_task.TaskMetadata):
discovery_version=pb2_object.discovery_version,
deprecated_error_message=pb2_object.deprecated_error_message,
cache_serializable=pb2_object.cache_serializable,
generates_deck=pb2_object.generates_deck,
pod_template_name=pb2_object.pod_template_name,
cache_ignore_input_vars=pb2_object.cache_ignore_input_vars,
is_eager=pb2_object.is_eager,
Expand Down
8 changes: 5 additions & 3 deletions flytekit/tools/translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,13 @@ def get_serializable_task(
entity.reset_command_fn()

entity_config = entity.get_config(settings) or {}

extra_config = {}

if hasattr(entity, "task_function") and isinstance(entity.task_function, ClassDecorator):
extra_config = entity.task_function.get_extra_config()
if hasattr(entity, "task_function"):
if isinstance(entity.task_function, ClassDecorator):
extra_config = entity.task_function.get_extra_config()
if not entity.disable_deck:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if not entity.disable_deck:
if entity.enable_deck:

entity.metadata.generates_deck = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add this to a unit test?


merged_config = {**entity_config, **extra_config}

Expand Down
Loading