From 36ccce5e24691014c54ef6e0721291b1d762087b Mon Sep 17 00:00:00 2001 From: Ville Tuulos Date: Sat, 5 Aug 2023 23:28:10 -0700 Subject: [PATCH] [current.card.refresh] refactor cards to support runtime updates --- metaflow/plugins/cards/card_cli.py | 100 +++++++++++++--- metaflow/plugins/cards/card_datastore.py | 40 +++++-- metaflow/plugins/cards/card_decorator.py | 107 ++++++++++++++---- metaflow/plugins/cards/card_modules/card.py | 12 +- .../plugins/cards/component_serializer.py | 77 ++++++++++++- 5 files changed, 274 insertions(+), 62 deletions(-) diff --git a/metaflow/plugins/cards/card_cli.py b/metaflow/plugins/cards/card_cli.py index 2c0c8017744..ce2509b0d6f 100644 --- a/metaflow/plugins/cards/card_cli.py +++ b/metaflow/plugins/cards/card_cli.py @@ -6,7 +6,9 @@ from metaflow._vendor import click import os import json +import uuid import signal +import inspect import random from contextlib import contextmanager from functools import wraps @@ -375,14 +377,20 @@ def wrapper(*args, **kwargs): return wrapper -def render_card(mf_card, task, timeout_value=None): - rendered_info = None +def update_card(mf_card, mode, task, data, timeout_value=None): + def _call(): + # compatibility with old render()-method that doesn't accept the data arg + new_render = "data" in inspect.getfullargspec(mf_card.render).args + if mode == "render" and not new_render: + return mf_card.render(task) + else: + return getattr(mf_card, mode)(task, data=data) + if timeout_value is None or timeout_value < 0: - rendered_info = mf_card.render(task) + return _call() else: with timeout(timeout_value): - rendered_info = mf_card.render(task) - return rendered_info + return _call() @card.command(help="create a HTML card") @@ -414,29 +422,61 @@ def render_card(mf_card, task, timeout_value=None): is_flag=True, help="Upon failing to render a card, render a card holding the stack trace", ) +@click.option( + "--id", + default=None, + show_default=True, + type=str, + help="ID of the card", +) @click.option( "--component-file", default=None, show_default=True, type=str, - help="JSON File with Pre-rendered components.(internal)", + help="JSON File with Pre-rendered components. (internal)", ) @click.option( - "--id", + "--mode", + default="render", + show_default=True, + type=str, + help="Rendering mode. (internal)", +) +@click.option( + "--data-file", default=None, show_default=True, type=str, - help="ID of the card", + help="JSON file containing data to be updated. (internal)", +) +@click.option( + "--card-uuid", + default=None, + show_default=True, + type=str, + help="Card UUID. (internal)", +) +@click.option( + "--delete-input-files", + default=False, + is_flag=True, + show_default=True, + help="Delete data-file and compontent-file after reading. (internal)", ) @click.pass_context def create( ctx, pathspec, + mode=None, type=None, options=None, timeout=None, component_file=None, + data_file=None, render_error_card=False, + card_uuid=None, + delete_input_files=None, id=None, ): card_id = id @@ -452,11 +492,26 @@ def create( graph_dict, _ = ctx.obj.graph.output_steps() + if card_uuid is None: + card_uuid = str(uuid.uuid4()).replace("-", "") + # Components are rendered in a Step and added via `current.card.append` are added here. component_arr = [] if component_file is not None: with open(component_file, "r") as f: component_arr = json.load(f) + # data is passed in as temporary files which can be deleted after use + if delete_input_files: + os.remove(component_file) + + # Load data to be refreshed for runtime cards + data = {} + if data_file is not None: + with open(data_file, "r") as f: + data = json.load(f) + # data is passed in as temporary files which can be deleted after use + if delete_input_files: + os.remove(data_file) task = Task(full_pathspec) from metaflow.plugins import CARDS @@ -500,7 +555,9 @@ def create( if mf_card: try: - rendered_info = render_card(mf_card, task, timeout_value=timeout) + rendered_info = update_card( + mf_card, mode, task, data, timeout_value=timeout + ) except: if render_error_card: error_stack_trace = str(UnrenderableCardException(type, options)) @@ -508,10 +565,10 @@ def create( raise UnrenderableCardException(type, options) # - if error_stack_trace is not None: + if error_stack_trace is not None and mode != "refresh": rendered_info = error_card().render(task, stack_trace=error_stack_trace) - if rendered_info is None and render_error_card: + if rendered_info is None and render_error_card and mode != "refresh": rendered_info = error_card().render( task, stack_trace="No information rendered From card of type %s" % type ) @@ -532,12 +589,20 @@ def create( card_id = None if rendered_info is not None: - card_info = card_datastore.save_card(save_type, rendered_info, card_id=card_id) - ctx.obj.echo( - "Card created with type: %s and hash: %s" - % (card_info.type, card_info.hash[:NUM_SHORT_HASH_CHARS]), - fg="green", - ) + if mode == "refresh": + card_datastore.save_data( + card_uuid, save_type, rendered_info, card_id=card_id + ) + ctx.obj.echo("Data updated", fg="green") + else: + card_info = card_datastore.save_card( + card_uuid, save_type, rendered_info, card_id=card_id + ) + ctx.obj.echo( + "Card created with type: %s and hash: %s" + % (card_info.type, card_info.hash[:NUM_SHORT_HASH_CHARS]), + fg="green", + ) @card.command() @@ -655,7 +720,6 @@ def list( as_json=False, file=None, ): - card_id = id if pathspec is None: list_many_cards( diff --git a/metaflow/plugins/cards/card_datastore.py b/metaflow/plugins/cards/card_datastore.py index 59931592878..42ec99ca6ab 100644 --- a/metaflow/plugins/cards/card_datastore.py +++ b/metaflow/plugins/cards/card_datastore.py @@ -6,6 +6,7 @@ from hashlib import sha1 from io import BytesIO import os +import json import shutil from metaflow.plugins.datastores.local_storage import LocalStorage @@ -88,15 +89,15 @@ def __init__(self, flow_datastore, pathspec=None): self._temp_card_save_path = self._get_write_path(base_pth=TEMP_DIR_NAME) @classmethod - def get_card_location(cls, base_path, card_name, card_html, card_id=None): - chash = sha1(bytes(card_html, "utf-8")).hexdigest() + def get_card_location(cls, base_path, card_name, uuid, card_id=None, suffix="html"): + chash = uuid if card_id is None: - card_file_name = "%s-%s.html" % (card_name, chash) + card_file_name = "%s-%s.%s" % (card_name, chash, suffix) else: - card_file_name = "%s-%s-%s.html" % (card_name, card_id, chash) + card_file_name = "%s-%s-%s.%s" % (card_name, card_id, chash, suffix) return os.path.join(base_path, card_file_name) - def _make_path(self, base_pth, pathspec=None, with_steps=False): + def _make_path(self, base_pth, pathspec=None, with_steps=False, suffix="cards"): sysroot = base_pth if pathspec is not None: # since most cards are at a task level there will always be 4 non-none values returned @@ -121,7 +122,7 @@ def _make_path(self, base_pth, pathspec=None, with_steps=False): step_name, "tasks", task_id, - "cards", + suffix, ] else: pth_arr = [ @@ -131,14 +132,16 @@ def _make_path(self, base_pth, pathspec=None, with_steps=False): run_id, "tasks", task_id, - "cards", + suffix, ] if sysroot == "" or sysroot is None: pth_arr.pop(0) return os.path.join(*pth_arr) - def _get_write_path(self, base_pth=""): - return self._make_path(base_pth, pathspec=self._pathspec, with_steps=True) + def _get_write_path(self, base_pth="", suffix="cards"): + return self._make_path( + base_pth, pathspec=self._pathspec, with_steps=True, suffix=suffix + ) def _get_read_path(self, base_pth="", with_steps=False): return self._make_path(base_pth, pathspec=self._pathspec, with_steps=with_steps) @@ -173,7 +176,20 @@ def card_info_from_path(path): card_hash = card_hash.split(".html")[0] return CardInfo(card_type, card_hash, card_id, card_file_name) - def save_card(self, card_type, card_html, card_id=None, overwrite=True): + def save_data(self, uuid, card_type, json_data, card_id=None): + card_file_name = card_type + loc = self.get_card_location( + self._get_write_path(suffix="runtime"), + card_file_name, + uuid, + card_id=card_id, + suffix="data.json", + ) + self._backend.save_bytes( + [(loc, BytesIO(json.dumps(json_data).encode("utf-8")))], overwrite=True + ) + + def save_card(self, uuid, card_type, card_html, card_id=None, overwrite=True): card_file_name = card_type # TEMPORARY_WORKAROUND: FIXME (LATER) : Fix the duplication of below block in a few months. # Check file blame to understand the age of this temporary workaround. @@ -193,7 +209,7 @@ def save_card(self, card_type, card_html, card_id=None, overwrite=True): # It will also easily end up breaking the metaflow-ui (which maybe using a client from an older version). # Hence, we are writing cards to both paths so that we can introduce breaking changes later in the future. card_path_with_steps = self.get_card_location( - self._get_write_path(), card_file_name, card_html, card_id=card_id + self._get_write_path(), card_file_name, uuid, card_id=card_id ) if SKIP_CARD_DUALWRITE: self._backend.save_bytes( @@ -204,7 +220,7 @@ def save_card(self, card_type, card_html, card_id=None, overwrite=True): card_path_without_steps = self.get_card_location( self._get_read_path(with_steps=False), card_file_name, - card_html, + uuid, card_id=card_id, ) for cp in [card_path_with_steps, card_path_without_steps]: diff --git a/metaflow/plugins/cards/card_decorator.py b/metaflow/plugins/cards/card_decorator.py index efa13a1ec90..c629d260c00 100644 --- a/metaflow/plugins/cards/card_decorator.py +++ b/metaflow/plugins/cards/card_decorator.py @@ -2,6 +2,7 @@ import os import tempfile import sys +import time import json from typing import Dict, Any @@ -17,6 +18,8 @@ from .exception import CARD_ID_PATTERN, TYPE_CHECK_REGEX +ASYNC_TIMEOUT = 30 + def warning_message(message, logger=None, ts=False): msg = "[@card WARNING] %s" % message @@ -69,6 +72,7 @@ def __init__(self, *args, **kwargs): self._is_editable = False self._card_uuid = None self._user_set_card_id = None + self._async_proc = None def _is_event_registered(self, evt_name): return evt_name in self._called_once @@ -89,7 +93,6 @@ def _increment_step_counter(cls): def step_init( self, flow, graph, step_name, decorators, environment, flow_datastore, logger ): - self._flow_datastore = flow_datastore self._environment = environment self._logger = logger @@ -131,6 +134,8 @@ def task_pre_step( if card_class is not None: # Card type was not found if card_class.ALLOW_USER_COMPONENTS: self._is_editable = True + self._is_runtime_card = card_class.IS_RUNTIME_CARD + # We have a step counter to ensure that on calling the final card decorator's `task_pre_step` # we call a `finalize` function in the `CardComponentCollector`. # This can help ensure the behaviour of the `current.card` object is according to specification. @@ -155,7 +160,9 @@ def task_pre_step( # we need to ensure that `current.card` has `CardComponentCollector` instantiated only once. if not self._is_event_registered("pre-step"): self._register_event("pre-step") - current._update_env({"card": CardComponentCollector(self._logger)}) + current._update_env( + {"card": CardComponentCollector(self._logger, self._card_proc)} + ) # this line happens because of decospecs parsing. customize = False @@ -184,9 +191,22 @@ def task_finished( ): if not is_task_ok: return - component_strings = current.card._serialize_components(self._card_uuid) + return self._card_proc("render") + + def _card_proc(self, mode): + if mode != "render" and not self._is_runtime_card: + # silently ignore runtime updates for cards that don't support them + return + elif mode == "refresh": + # don't serialize components, which can be a somewhat expensive operation, + # if we are just updating data + component_strings = [] + else: + component_strings = current.card._serialize_components(self._card_uuid) + + data = current.card._get_latest_data(self._card_uuid) runspec = "/".join([current.run_id, current.step_name, current.task_id]) - self._run_cards_subprocess(runspec, component_strings) + self._run_cards_subprocess(mode, runspec, component_strings, data) @staticmethod def _options(mapping): @@ -200,7 +220,6 @@ def _options(mapping): yield to_unicode(value) def _create_top_level_args(self): - top_level_options = { "quiet": True, "metadata": self._metadata.TYPE, @@ -215,12 +234,23 @@ def _create_top_level_args(self): } return list(self._options(top_level_options)) - def _run_cards_subprocess(self, runspec, component_strings): - temp_file = None + def _run_cards_subprocess(self, mode, runspec, component_strings, data=None): + components_file = data_file = None + wait = mode == "render" + if len(component_strings) > 0: - temp_file = tempfile.NamedTemporaryFile("w", suffix=".json") - json.dump(component_strings, temp_file) - temp_file.seek(0) + # note that we can't delete temporary files here when calling the subprocess + # async due to a race condition. The subprocess must delete them + components_file = tempfile.NamedTemporaryFile( + "w", suffix=".json", delete=False + ) + json.dump(component_strings, components_file) + compotents_file.seek(0) + if data is not None: + data_file = tempfile.NamedTemporaryFile("w", suffix=".json", delete=False) + json.dump(data, data_file) + data_file.seek(0) + executable = sys.executable cmd = [ executable, @@ -230,6 +260,11 @@ def _run_cards_subprocess(self, runspec, component_strings): "card", "create", runspec, + "--delete-input-files", + "--card-uuid", + self._card_uuid, + "--mode", + mode, "--type", self.attributes["type"], # Add the options relating to card arguments. @@ -248,11 +283,14 @@ def _run_cards_subprocess(self, runspec, component_strings): if self.attributes["save_errors"]: cmd += ["--render-error-card"] - if temp_file is not None: - cmd += ["--component-file", temp_file.name] + if components_file is not None: + cmd += ["--component-file", components_file.name] + + if data_file is not None: + cmd += ["--data-file", data_file.name] response, fail = self._run_command( - cmd, os.environ, timeout=self.attributes["timeout"] + cmd, os.environ, timeout=self.attributes["timeout"], wait=wait ) if fail: resp = "" if response is None else response.decode("utf-8") @@ -262,19 +300,38 @@ def _run_cards_subprocess(self, runspec, component_strings): bad=True, ) - def _run_command(self, cmd, env, timeout=None): + def _run_command(self, cmd, env, wait=True, timeout=None): fail = False timeout_args = {} + async_timeout = ASYNC_TIMEOUT if timeout is not None: + async_timeout = int(timeout) + 10 timeout_args = dict(timeout=int(timeout) + 10) - try: - rep = subprocess.check_output( - cmd, env=env, stderr=subprocess.STDOUT, **timeout_args - ) - except subprocess.CalledProcessError as e: - rep = e.output - fail = True - except subprocess.TimeoutExpired as e: - rep = e.output - fail = True - return rep, fail + + if wait: + try: + rep = subprocess.check_output( + cmd, env=env, stderr=subprocess.STDOUT, **timeout_args + ) + except subprocess.CalledProcessError as e: + rep = e.output + fail = True + except subprocess.TimeoutExpired as e: + rep = e.output + fail = True + return rep, fail + else: + if self._async_proc and self._async_proc.poll() is None: + if time.time() - self._async_started > async_timeout: + self._async_proc.kill() + else: + # silently refuse to run an async process if a previous one is still running + # and timeout hasn't been reached + return "", False + else: + #print("CARD CMD", " ".join(cmd)) + self._async_proc = subprocess.Popen( + cmd, env=env, stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL + ) + self._async_started = time.time() + return "", False diff --git a/metaflow/plugins/cards/card_modules/card.py b/metaflow/plugins/cards/card_modules/card.py index 35a639299dd..a011d4e4f88 100644 --- a/metaflow/plugins/cards/card_modules/card.py +++ b/metaflow/plugins/cards/card_modules/card.py @@ -35,6 +35,7 @@ class MetaflowCard(object): type = None ALLOW_USER_COMPONENTS = False + IS_RUNTIME_CARD = False scope = "task" # can be task | run @@ -49,7 +50,8 @@ def _get_mustache(self): except ImportError: return None - def render(self, task) -> str: + # FIXME document data + def render(self, task, data=None) -> str: """ Produce custom card contents in HTML. @@ -68,6 +70,14 @@ def render(self, task) -> str: """ return NotImplementedError() + # FIXME document + def render_runtime(self, task, data): + return + + # FIXME document + def refresh(self, task, data): + return + class MetaflowCardComponent(object): def render(self): diff --git a/metaflow/plugins/cards/component_serializer.py b/metaflow/plugins/cards/component_serializer.py index 9451cd67406..afcc8bd1def 100644 --- a/metaflow/plugins/cards/component_serializer.py +++ b/metaflow/plugins/cards/component_serializer.py @@ -3,9 +3,14 @@ from .card_modules.components import UserComponent import uuid import json +import time _TYPE = type +# TODO move these to config +RUNTIME_CARD_MIN_REFRESH_INTERVAL = 5 +RUNTIME_CARD_RENDER_INTERVAL = 60 + def get_card_class(card_type): from metaflow.plugins import CARDS @@ -21,6 +26,51 @@ def __init__(self, warning_message): super().__init__("@card WARNING", warning_message) +class CardComponents: + def __init__(self, card_proc, components=None): + self._card_proc = card_proc + self._latest_user_data = None + self._last_refresh = 0 + self._last_render = 0 + + if components is None: + self._components = [] + else: + self._components = list(components) + + def append(self, component): + self._components.append(component) + + def extend(self, components): + self._components.extend(components) + + def clear(self): + self._components.clear() + + def refresh(self, data=None, force=False): + # todo make this a configurable variable + self._latest_user_data = data + nu = time.time() + if nu - self._last_refresh < RUNTIME_CARD_MIN_REFRESH_INTERVAL: + # rate limit refreshes: silently ignore requests that + # happen too frequently + return + self._last_refresh = nu + # FIXME force render if components have changed + if force or nu - self._last_render > RUNTIME_CARD_RENDER_INTERVAL: + self._card_proc("render_runtime") + self._last_render = nu + else: + self._card_proc("refresh") + + def _get_latest_data(self): + # FIXME add component data + return {"user": self._latest_user_data, "components": []} + + def __iter__(self): + return iter(self._components) + + class CardComponentCollector: """ This class helps collect `MetaflowCardComponent`s during runtime execution @@ -42,17 +92,18 @@ class CardComponentCollector: - [x] by looking it up by its type, e.g. `current.card.get(type='pytorch')`. """ - def __init__(self, logger=None): + def __init__(self, logger=None, card_proc=None): from metaflow.metaflow_config import CARD_NO_WARNING self._cards_components = ( {} - ) # a dict with key as uuid and value as a list of MetaflowCardComponent. + ) # a dict with key as uuid and value as CardComponents, holding a list of MetaflowCardComponents. self._cards_meta = ( {} ) # a `dict` of (card_uuid, `dict)` holding all metadata about all @card decorators on the `current` @step. self._card_id_map = {} # card_id to uuid map for all cards with ids self._logger = logger + self._card_proc = card_proc # `self._default_editable_card` holds the uuid of the card that is default editable. This card has access to `append`/`extend` methods of `self` self._default_editable_card = None self._warned_once = {"__getitem__": {}, "append": False, "extend": False} @@ -60,7 +111,7 @@ def __init__(self, logger=None): @staticmethod def create_uuid(): - return str(uuid.uuid4()) + return str(uuid.uuid4()).replace("-", "") def _log(self, *args, **kwargs): if self._logger: @@ -97,7 +148,7 @@ def _add_card( suppress_warnings=suppress_warnings, ) self._cards_meta[card_uuid] = card_metadata - self._cards_components[card_uuid] = [] + self._cards_components[card_uuid] = CardComponents(self._card_proc) return card_metadata def _warning(self, message): @@ -229,7 +280,7 @@ def __getitem__(self, key): Returns ------- - CardComponentCollector + CardComponents An object with `append` and `extend` calls which allow you to add components to the chosen card. """ @@ -275,7 +326,7 @@ def __setitem__(self, key, value): ) self._warning(_warning_msg) return - self._cards_components[card_uuid] = value + self._cards_components[card_uuid] = CardComponents(self._card_proc, value) return self._warning( @@ -359,6 +410,20 @@ def extend(self, components): self._cards_components[self._default_editable_card].extend(components) + def clear(self): + if self._default_editable_card is not None: + self._cards_components[self._default_editable_card].clear() + + def refresh(self, *args, **kwargs): + if self._default_editable_card is not None: + self._cards_components[self._default_editable_card].refresh(*args, **kwargs) + + def _get_latest_data(self, card_uuid): + """ + Returns latest data so it can be used in the final render() call + """ + return self._cards_components[card_uuid]._get_latest_data() + def _serialize_components(self, card_uuid): """ This method renders components present in a card to strings/json.