-
Notifications
You must be signed in to change notification settings - Fork 304
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[flyte deck] Streaming Decks #2779
base: master
Are you sure you want to change the base?
Changes from 60 commits
01182b4
9616fc3
98ae7c7
4e92bb0
4df18b5
ebb4d4e
99522d9
c19d67d
67cd829
06da3df
6d99d69
b805cd7
4c97758
7b3574a
9b60564
18c994f
39f39d1
aabcbbb
9ca43f3
ed56352
b559fc9
fc5578f
7139468
e0aee9e
3727588
ce3ee15
d066231
f9387ce
f14c3fa
c33a909
8666c60
93580d6
bcaaabd
a321700
6464fae
884943c
d4b5b96
406227c
1e77f54
d70a2d5
7fc6393
6980140
c87a342
f609760
473ae11
008fe52
f0b9028
d48efa9
6b55930
b5912fb
a681ccd
048fdff
7bcf15e
b6c41c3
be02f9f
cf83e06
e137328
a59a56e
b8383be
dc6d203
41d8760
b71cc19
b5976fe
0c1a5a3
d082456
4a8c68f
b58527b
2764ed4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -129,6 +129,7 @@ class TaskMetadata(object): | |
timeout (Optional[Union[datetime.timedelta, int]]): the max amount of time for which one execution of this task | ||
should be executed for. The execution will be terminated if the runtime exceeds the given timeout | ||
(approximately) | ||
:param bool generates_deck: Whether the task will generate a Deck URI. | ||
pod_template_name (Optional[str]): the name of existing PodTemplate resource in the cluster which will be used in this task. | ||
""" | ||
|
||
|
@@ -141,6 +142,7 @@ class TaskMetadata(object): | |
retries: int = 0 | ||
timeout: Optional[Union[datetime.timedelta, int]] = None | ||
pod_template_name: Optional[str] = None | ||
generates_deck: bool = False | ||
is_eager: bool = False | ||
|
||
def __post_init__(self): | ||
|
@@ -179,6 +181,7 @@ def to_taskmetadata_model(self) -> _task_model.TaskMetadata: | |
discovery_version=self.cache_version, | ||
deprecated_error_message=self.deprecated, | ||
cache_serializable=self.cache_serialize, | ||
generates_deck=self.generates_deck, | ||
pod_template_name=self.pod_template_name, | ||
cache_ignore_input_vars=self.cache_ignore_input_vars, | ||
is_eager=self.is_eager, | ||
|
@@ -720,8 +723,10 @@ def dispatch_execute( | |
may be none | ||
* ``DynamicJobSpec`` is returned when a dynamic workflow is executed | ||
""" | ||
if DeckField.TIMELINE.value in self.deck_fields and ctx.user_space_params is not None: | ||
ctx.user_space_params.decks.append(ctx.user_space_params.timeline_deck) | ||
if not self.disable_deck and ctx.user_space_params is not None: | ||
ctx.user_space_params._enable_deck = True # type: ignore | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok I'll move it to the builder, thank you |
||
if DeckField.TIMELINE.value in self.deck_fields: | ||
ctx.user_space_params.decks.append(ctx.user_space_params.timeline_deck) | ||
# Invoked before the task is executed | ||
new_user_params = self.pre_execute(ctx.user_space_params) | ||
|
||
|
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -41,10 +41,6 @@ class Deck: | |||||||||||
scatter plots or Markdown text. In addition, users can create new decks to render | ||||||||||||
their data with custom renderers. | ||||||||||||
|
||||||||||||
.. warning:: | ||||||||||||
|
||||||||||||
This feature is in beta. | ||||||||||||
|
||||||||||||
.. code-block:: python | ||||||||||||
|
||||||||||||
iris_df = px.data.iris() | ||||||||||||
|
@@ -86,6 +82,12 @@ def name(self) -> str: | |||||||||||
def html(self) -> str: | ||||||||||||
return self._html | ||||||||||||
|
||||||||||||
@staticmethod | ||||||||||||
def publish(): | ||||||||||||
params = FlyteContextManager.current_context().user_space_params | ||||||||||||
task_name = params.task_id.name | ||||||||||||
_output_deck(task_name=task_name, new_user_params=params) | ||||||||||||
pingsutw marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
|
||||||||||||
|
||||||||||||
class TimeLineDeck(Deck): | ||||||||||||
""" | ||||||||||||
|
@@ -148,7 +150,8 @@ def generate_time_table(data: dict) -> str: | |||||||||||
|
||||||||||||
|
||||||||||||
def _get_deck( | ||||||||||||
new_user_params: ExecutionParameters, ignore_jupyter: bool = False | ||||||||||||
new_user_params: ExecutionParameters, | ||||||||||||
ignore_jupyter: bool = False, | ||||||||||||
) -> typing.Union[str, "IPython.core.display.HTML"]: # type:ignore | ||||||||||||
""" | ||||||||||||
Get flyte deck html string | ||||||||||||
|
@@ -176,11 +179,17 @@ def _get_deck( | |||||||||||
|
||||||||||||
def _output_deck(task_name: str, new_user_params: ExecutionParameters): | ||||||||||||
ctx = FlyteContext.current_context() | ||||||||||||
params = ctx.user_space_params | ||||||||||||
|
||||||||||||
if not params.enable_deck: | ||||||||||||
logger.warning("Deck is disabled for this task, please don't call Deck.publish()") | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider adding None check for params
Consider checking Code suggestionCheck the AI-generated fix before applying
Suggested change
Code Review Run #6220fd Is this a valid issue, or was it incorrectly flagged by the Agent?
|
||||||||||||
return | ||||||||||||
|
||||||||||||
local_dir = ctx.file_access.get_random_local_directory() | ||||||||||||
local_path = f"{local_dir}{os.sep}{DECK_FILE_NAME}" | ||||||||||||
try: | ||||||||||||
with open(local_path, "w", encoding="utf-8") as f: | ||||||||||||
f.write(_get_deck(new_user_params, ignore_jupyter=True)) | ||||||||||||
f.write(_get_deck(new_user_params=new_user_params, ignore_jupyter=True)) | ||||||||||||
logger.info(f"{task_name} task creates flyte deck html to file://{local_path}") | ||||||||||||
if ctx.execution_state.mode == ExecutionState.Mode.TASK_EXECUTION: | ||||||||||||
fs = ctx.file_access.get_filesystem_for_path(new_user_params.output_metadata_prefix) | ||||||||||||
|
@@ -197,6 +206,7 @@ def _output_deck(task_name: str, new_user_params: ExecutionParameters): | |||||||||||
def get_deck_template() -> Template: | ||||||||||||
root = os.path.dirname(os.path.abspath(__file__)) | ||||||||||||
templates_dir = os.path.join(root, "html", "template.html") | ||||||||||||
|
||||||||||||
with open(templates_dir, "r") as f: | ||||||||||||
template_content = f.read() | ||||||||||||
return Template(template_content) |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -180,6 +180,7 @@ def __init__( | |||||
pod_template_name, | ||||||
cache_ignore_input_vars, | ||||||
is_eager: bool = False, | ||||||
generates_deck: bool = False, | ||||||
): | ||||||
""" | ||||||
Information needed at runtime to determine behavior such as whether or not outputs are discoverable, timeouts, | ||||||
|
@@ -199,6 +200,7 @@ def __init__( | |||||
receive deprecation warnings. | ||||||
:param bool cache_serializable: Whether or not caching operations are executed in serial. This means only a | ||||||
single instance over identical inputs is executed, other concurrent executions wait for the cached results. | ||||||
:param bool generates_deck: Whether the task will generate a Deck URI. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
:param pod_template_name: The name of the existing PodTemplate resource which will be used in this task. | ||||||
:param cache_ignore_input_vars: Input variables that should not be included when calculating hash for cache. | ||||||
:param is_eager: | ||||||
|
@@ -214,6 +216,7 @@ def __init__( | |||||
self._pod_template_name = pod_template_name | ||||||
self._cache_ignore_input_vars = cache_ignore_input_vars | ||||||
self._is_eager = is_eager | ||||||
self._generates_deck = generates_deck | ||||||
|
||||||
@property | ||||||
def is_eager(self): | ||||||
|
@@ -295,6 +298,14 @@ def pod_template_name(self): | |||||
""" | ||||||
return self._pod_template_name | ||||||
|
||||||
@property | ||||||
def generates_deck(self) -> bool: | ||||||
""" | ||||||
Whether the task will generate a Deck URI. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
:rtype: bool | ||||||
""" | ||||||
return self._generates_deck | ||||||
|
||||||
@property | ||||||
def cache_ignore_input_vars(self): | ||||||
""" | ||||||
|
@@ -315,6 +326,7 @@ def to_flyte_idl(self): | |||||
discovery_version=self.discovery_version, | ||||||
deprecated_error_message=self.deprecated_error_message, | ||||||
cache_serializable=self.cache_serializable, | ||||||
generates_deck=self.generates_deck, | ||||||
pod_template_name=self.pod_template_name, | ||||||
cache_ignore_input_vars=self.cache_ignore_input_vars, | ||||||
is_eager=self.is_eager, | ||||||
|
@@ -338,6 +350,7 @@ def from_flyte_idl(cls, pb2_object: _core_task.TaskMetadata): | |||||
discovery_version=pb2_object.discovery_version, | ||||||
deprecated_error_message=pb2_object.deprecated_error_message, | ||||||
cache_serializable=pb2_object.cache_serializable, | ||||||
generates_deck=pb2_object.generates_deck, | ||||||
pod_template_name=pb2_object.pod_template_name, | ||||||
cache_ignore_input_vars=pb2_object.cache_ignore_input_vars, | ||||||
is_eager=pb2_object.is_eager, | ||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -185,11 +185,13 @@ def get_serializable_task( | |||||
entity.reset_command_fn() | ||||||
|
||||||
entity_config = entity.get_config(settings) or {} | ||||||
|
||||||
extra_config = {} | ||||||
|
||||||
if hasattr(entity, "task_function") and isinstance(entity.task_function, ClassDecorator): | ||||||
extra_config = entity.task_function.get_extra_config() | ||||||
if hasattr(entity, "task_function"): | ||||||
if isinstance(entity.task_function, ClassDecorator): | ||||||
extra_config = entity.task_function.get_extra_config() | ||||||
if not entity.disable_deck: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
entity.metadata.generates_deck = True | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we add this to a unit test? |
||||||
|
||||||
merged_config = {**entity_config, **extra_config} | ||||||
|
||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit