diff --git a/metaflow/cli.py b/metaflow/cli.py index 2f84307d639..095fa2011e3 100644 --- a/metaflow/cli.py +++ b/metaflow/cli.py @@ -27,6 +27,7 @@ from .metaflow_current import current from metaflow.system import _system_monitor, _system_logger from .metaflow_environment import MetaflowEnvironment +from .packaging_sys import MFContent from .plugins import ( DATASTORES, ENVIRONMENTS, @@ -326,6 +327,11 @@ def start( echo(" executing *%s*" % ctx.obj.flow.name, fg="magenta", nl=False) echo(" for *%s*" % resolve_identity(), fg="magenta") + # Check if we need to setup the distribution finder (if running ) + dist_info = MFContent.get_distribution_finder() + if dist_info: + sys.meta_path.append(dist_info) + # Setup the context cli_args._set_top_kwargs(ctx.params) ctx.obj.echo = echo diff --git a/metaflow/cli_components/init_cmd.py b/metaflow/cli_components/init_cmd.py index fdd64bdcc54..92e18ee9e57 100644 --- a/metaflow/cli_components/init_cmd.py +++ b/metaflow/cli_components/init_cmd.py @@ -46,6 +46,7 @@ def init(obj, run_id=None, task_id=None, tags=None, **kwargs): obj.event_logger, obj.monitor, run_id=run_id, + skip_decorator_hooks=True, ) obj.flow._set_constants(obj.graph, kwargs, obj.config_options) runtime.persist_constants(task_id=task_id) diff --git a/metaflow/cli_components/run_cmds.py b/metaflow/cli_components/run_cmds.py index 8a47c057597..500f3a33788 100644 --- a/metaflow/cli_components/run_cmds.py +++ b/metaflow/cli_components/run_cmds.py @@ -61,7 +61,11 @@ def before_run(obj, tags, decospecs): # We explicitly avoid doing this in `start` since it is invoked for every # step in the run. obj.package = MetaflowPackage( - obj.flow, obj.environment, obj.echo, obj.package_suffixes + obj.flow, + obj.environment, + obj.echo, + suffixes=obj.package_suffixes, + flow_datastore=obj.flow_datastore, ) diff --git a/metaflow/client/core.py b/metaflow/client/core.py index 4edbcdac00c..eee4eb390ef 100644 --- a/metaflow/client/core.py +++ b/metaflow/client/core.py @@ -32,11 +32,13 @@ from metaflow.includefile import IncludedFile from metaflow.metaflow_config import DEFAULT_METADATA, MAX_ATTEMPTS from metaflow.metaflow_environment import MetaflowEnvironment +from metaflow.package import MetaflowPackage +from metaflow.packaging_sys import ContentType +from metaflow.packaging_sys.tar_backend import TarPackagingBackend from metaflow.plugins import ENVIRONMENTS, METADATA_PROVIDERS from metaflow.unbounded_foreach import CONTROL_TASK_TAG from metaflow.util import cached_property, is_stringish, resolve_identity, to_unicode -from ..info_file import INFO_FILE from .filecache import FileCache if TYPE_CHECKING: @@ -816,20 +818,27 @@ def __init__(self, flow_name: str, code_package: str): self._path = info["location"] self._ds_type = info["ds_type"] self._sha = info["sha"] + self._code_metadata = info.get("metadata") + if self._code_metadata is None: + # Default string + self._code_metadata = ( + '{"version": 0, "backend": "tgz", "mfcontent_version": 0}' + ) + self._backend = MetaflowPackage.get_backend(self._code_metadata) if filecache is None: filecache = FileCache() _, blobdata = filecache.get_data( self._ds_type, self._flow_name, self._path, self._sha ) - code_obj = BytesIO(blobdata) - self._tar = tarfile.open(fileobj=code_obj, mode="r:gz") - # The JSON module in Python3 deals with Unicode. Tar gives bytes. - info_str = ( - self._tar.extractfile(os.path.basename(INFO_FILE)).read().decode("utf-8") - ) - self._info = json.loads(info_str) - self._flowspec = self._tar.extractfile(self._info["script"]).read() + self._code_obj = BytesIO(blobdata) + self._info = MetaflowPackage.cls_get_info(self._code_metadata, self._code_obj) + if self._info: + self._flowspec = MetaflowPackage.cls_get_content( + self._code_metadata, self._code_obj, self._info["script"] + ) + else: + raise MetaflowInternalError("Code package metadata is invalid.") @property def path(self) -> str: @@ -877,7 +886,9 @@ def tarball(self) -> tarfile.TarFile: TarFile TarFile for everything in this code package """ - return self._tar + if self._backend == TarPackagingBackend: + return self._backend.cls_open(self._code_obj) + raise RuntimeError("Archive is not a tarball") def extract(self) -> TemporaryDirectory: """ @@ -908,27 +919,10 @@ def extract(self) -> TemporaryDirectory: The directory and its contents are automatically deleted when this object is garbage collected. """ - exclusions = [ - "metaflow/", - "metaflow_extensions/", - "INFO", - "CONFIG_PARAMETERS", - "conda.manifest", - # This file is created when using the conda/pypi features available in - # nflx-metaflow-extensions: https://github.com/Netflix/metaflow-nflx-extensions - "condav2-1.cnd", - ] - members = [ - m - for m in self.tarball.getmembers() - if not any( - (x.endswith("/") and m.name.startswith(x)) or (m.name == x) - for x in exclusions - ) - ] - tmp = TemporaryDirectory() - self.tarball.extractall(tmp.name, members) + MetaflowPackage.cls_extract_into( + self._code_metadata, self._code_obj, tmp.name, ContentType.USER_CONTENT + ) return tmp @property diff --git a/metaflow/debug.py b/metaflow/debug.py index fbfaca95dc8..1462df2bb7a 100644 --- a/metaflow/debug.py +++ b/metaflow/debug.py @@ -42,6 +42,11 @@ def log(self, typ, args): filename = inspect.stack()[1][1] print("debug[%s %s:%s]: %s" % (typ, filename, lineno, s), file=sys.stderr) + def __getattr__(self, name): + # Small piece of code to get pyright and other linters to recognize that there + # are dynamic attributes. + return getattr(self, name) + def noop(self, args): pass diff --git a/metaflow/decorators.py b/metaflow/decorators.py index 7c05bdcb312..ae7310a4840 100644 --- a/metaflow/decorators.py +++ b/metaflow/decorators.py @@ -21,11 +21,6 @@ from metaflow._vendor import click -try: - unicode -except NameError: - unicode = str - basestring = str # Contains the decorators on which _init was called. We want to ensure it is called # only once on each decorator and, as the _init() function below can be called in @@ -189,7 +184,7 @@ def make_decorator_spec(self): # escaping but for more complex types (typically dictionaries or lists), # we dump using JSON. for k, v in attrs.items(): - if isinstance(v, (int, float, unicode, basestring)): + if isinstance(v, (int, float, str)): attr_list.append("%s=%s" % (k, str(v))) else: attr_list.append("%s=%s" % (k, json.dumps(v).replace('"', '\\"'))) @@ -315,15 +310,36 @@ def package_init(self, flow, step_name, environment): def add_to_package(self): """ - Called to add custom packages needed for a decorator. This hook will be + Called to add custom files needed for this environment. This hook will be called in the `MetaflowPackage` class where metaflow compiles the code package - tarball. This hook is invoked in the `MetaflowPackage`'s `path_tuples` - function. The `path_tuples` function is a generator that yields a tuple of - `(file_path, arcname)`.`file_path` is the path of the file in the local file system; - the `arcname` is the path of the file in the constructed tarball or the path of the file - after decompressing the tarball. - - Returns a list of tuples where each tuple represents (file_path, arcname) + tarball. This hook can return one of two things (the first is for backwards + compatibility -- move to the second): + - a generator yielding a tuple of `(file_path, arcname)` to add files to + the code package. `file_path` is the path to the file on the local filesystem + and `arcname` is the path relative to the packaged code. + - a generator yielding a tuple of `(content, arcname, type)` where: + - type is one of + ContentType.{USER_CONTENT, CODE_CONTENT, MODULE_CONTENT, OTHER_CONTENT} + - for USER_CONTENT: + - the file will be included relative to the directory containing the + user's flow file. + - content: path to the file to include + - arcname: path relative to the directory containing the user's flow file + - for CODE_CONTENT: + - the file will be included relative to the code directory in the package. + This will be the directory containing `metaflow`. + - content: path to the file to include + - arcname: path relative to the code directory in the package + - for MODULE_CONTENT: + - the module will be added to the code package as a python module. It will + be accessible as usual (import ) + - content: name of the module + - arcname: None (ignored) + - for OTHER_CONTENT: + - the file will be included relative to any other configuration/metadata + files for the flow + - content: path to the file to include + - arcname: path relative to the config directory in the package """ return [] @@ -685,12 +701,8 @@ def foo(self): f.is_step = True f.decorators = [] f.config_decorators = [] - try: - # python 3 - f.name = f.__name__ - except: - # python 2 - f.name = f.__func__.func_name + f.wrappers = [] + f.name = f.__name__ return f diff --git a/metaflow/extension_support/__init__.py b/metaflow/extension_support/__init__.py index 1173a6af71e..81f5f5be2b3 100644 --- a/metaflow/extension_support/__init__.py +++ b/metaflow/extension_support/__init__.py @@ -12,7 +12,7 @@ from itertools import chain from pathlib import Path -from metaflow.info_file import read_info_file +from metaflow.meta_files import read_info_file # @@ -214,6 +214,10 @@ def package_mfext_all(): yield path_tuple +def package_mfext_all_descriptions(): + return _all_packages + + def load_globals(module, dst_globals, extra_indent=False): if extra_indent: extra_indent = " " @@ -808,13 +812,16 @@ def _get_extension_packages(ignore_info_file=False, restrict_to_directories=None " Extends '%s' with config '%s'" % (_extension_points[idx], config_module) ) - mf_pkg_list.append(package_name) - mf_ext_packages[package_name] = { - "root_paths": [package_path], - "meta_module": meta_module, - "files": files_to_include, - "version": "_local_", - } + if files_to_include: + mf_pkg_list.append(package_name) + mf_ext_packages[package_name] = { + "root_paths": [package_path], + "meta_module": meta_module, + "files": files_to_include, + "version": "_local_", + } + else: + _ext_debug("Skipping package as no files found (empty dir?)") # Sanity check that we only have one package per configuration file. # This prevents multiple packages from providing the same named configuration diff --git a/metaflow/extension_support/_empty_file.py b/metaflow/extension_support/_empty_file.py index d59e1556ddb..dbdcba34c17 100644 --- a/metaflow/extension_support/_empty_file.py +++ b/metaflow/extension_support/_empty_file.py @@ -1,2 +1,2 @@ -# This file serves as a __init__.py for metaflow_extensions when it is packaged -# and needs to remain empty. +# This file serves as a __init__.py for metaflow_extensions or metaflow +# packages when they are packaged and needs to remain empty. diff --git a/metaflow/info_file.py b/metaflow/info_file.py deleted file mode 100644 index 6d56a6152ba..00000000000 --- a/metaflow/info_file.py +++ /dev/null @@ -1,25 +0,0 @@ -import json - -from os import path - -CURRENT_DIRECTORY = path.dirname(path.abspath(__file__)) -INFO_FILE = path.join(path.dirname(CURRENT_DIRECTORY), "INFO") - -_info_file_content = None -_info_file_present = None - - -def read_info_file(): - global _info_file_content - global _info_file_present - if _info_file_present is None: - _info_file_present = path.exists(INFO_FILE) - if _info_file_present: - try: - with open(INFO_FILE, "r", encoding="utf-8") as contents: - _info_file_content = json.load(contents) - except IOError: - pass - if _info_file_present: - return _info_file_content - return None diff --git a/metaflow/meta_files.py b/metaflow/meta_files.py new file mode 100644 index 00000000000..084eaffaa71 --- /dev/null +++ b/metaflow/meta_files.py @@ -0,0 +1,13 @@ +_UNINITIALIZED = object() +_info_file_content = _UNINITIALIZED + + +def read_info_file(): + # Prevent circular import + from .packaging_sys import MFContent + + global _info_file_content + + if id(_info_file_content) == id(_UNINITIALIZED): + _info_file_content = MFContent.get_info() + return _info_file_content diff --git a/metaflow/metadata_provider/metadata.py b/metaflow/metadata_provider/metadata.py index b7256eb2924..0f88be8c964 100644 --- a/metaflow/metadata_provider/metadata.py +++ b/metaflow/metadata_provider/metadata.py @@ -674,11 +674,17 @@ def _register_system_metadata(self, run_id, step_name, task_id, attempt): if code_sha: code_url = os.environ.get("METAFLOW_CODE_URL") code_ds = os.environ.get("METAFLOW_CODE_DS") + code_metadata = os.environ.get("METAFLOW_CODE_METADATA") metadata.append( MetaDatum( field="code-package", value=json.dumps( - {"ds_type": code_ds, "sha": code_sha, "location": code_url} + { + "ds_type": code_ds, + "sha": code_sha, + "location": code_url, + "metadata": code_metadata, + } ), type="code-package", tags=["attempt_id:{0}".format(attempt)], diff --git a/metaflow/metaflow_config.py b/metaflow/metaflow_config.py index f13dcfe4d1f..351b1102699 100644 --- a/metaflow/metaflow_config.py +++ b/metaflow/metaflow_config.py @@ -460,6 +460,7 @@ "stubgen", "userconf", "conda", + "package", ] for typ in DEBUG_OPTIONS: diff --git a/metaflow/metaflow_environment.py b/metaflow/metaflow_environment.py index 354d21a8011..035b5f8e2f3 100644 --- a/metaflow/metaflow_environment.py +++ b/metaflow/metaflow_environment.py @@ -8,6 +8,8 @@ from metaflow.exception import MetaflowException from metaflow.extension_support import dump_module_info from metaflow.mflog import BASH_MFLOG, BASH_FLUSH_LOGS +from metaflow.package import MetaflowPackage + from . import R @@ -49,8 +51,36 @@ def bootstrap_commands(self, step_name, datastore_type): def add_to_package(self): """ - A list of tuples (file, arcname) to add to the job package. - `arcname` is an alternative name for the file in the job package. + Called to add custom files needed for this environment. This hook will be + called in the `MetaflowPackage` class where metaflow compiles the code package + tarball. This hook can return one of two things (the first is for backwards + compatibility -- move to the second): + - a generator yielding a tuple of `(file_path, arcname)` to add files to + the code package. `file_path` is the path to the file on the local filesystem + and `arcname` is the path relative to the packaged code. + - a generator yielding a tuple of `(content, arcname, type)` where: + - type is one of + ContentType.{USER_CONTENT, CODE_CONTENT, MODULE_CONTENT, OTHER_CONTENT} + - for USER_CONTENT: + - the file will be included relative to the directory containing the + user's flow file. + - content: path to the file to include + - arcname: path relative to the directory containing the user's flow file + - for CODE_CONTENT: + - the file will be included relative to the code directory in the package. + This will be the directory containing `metaflow`. + - content: path to the file to include + - arcname: path relative to the code directory in the package + - for MODULE_CONTENT: + - the module will be added to the code package as a python module. It will + be accessible as usual (import ) + - content: name of the module + - arcname: None (ignored) + - for OTHER_CONTENT: + - the file will be included relative to any other configuration/metadata + files for the flow + - content: path to the file to include + - arcname: path relative to the config directory in the package """ return [] @@ -157,29 +187,43 @@ def _get_install_dependencies_cmd(self, datastore_type): # skip pip installs if we know that packages might already be available return "if [ -z $METAFLOW_SKIP_INSTALL_DEPENDENCIES ]; then {}; fi".format(cmd) - def get_package_commands(self, code_package_url, datastore_type): - cmds = [ - BASH_MFLOG, - BASH_FLUSH_LOGS, - "mflog 'Setting up task environment.'", - self._get_install_dependencies_cmd(datastore_type), - "mkdir metaflow", - "cd metaflow", - "mkdir .metaflow", # mute local datastore creation log - "i=0; while [ $i -le 5 ]; do " - "mflog 'Downloading code package...'; " - + self._get_download_code_package_cmd(code_package_url, datastore_type) - + " && mflog 'Code package downloaded.' && break; " - "sleep 10; i=$((i+1)); " - "done", - "if [ $i -gt 5 ]; then " - "mflog 'Failed to download code package from %s " - "after 6 tries. Exiting...' && exit 1; " - "fi" % code_package_url, - "TAR_OPTIONS='--warning=no-timestamp' tar xf job.tar", - "mflog 'Task is starting.'", - "flush_mflogs", - ] + def get_package_commands( + self, code_package_metadata, code_package_url, datastore_type + ): + cmds = ( + [ + BASH_MFLOG, + BASH_FLUSH_LOGS, + "mflog 'Setting up task environment.'", + self._get_install_dependencies_cmd(datastore_type), + "mkdir metaflow", + "cd metaflow", + "mkdir .metaflow", # mute local datastore creation log + "i=0; while [ $i -le 5 ]; do " + "mflog 'Downloading code package...'; " + + self._get_download_code_package_cmd(code_package_url, datastore_type) + + " && mflog 'Code package downloaded.' && break; " + "sleep 10; i=$((i+1)); " + "done", + "if [ $i -gt 5 ]; then " + "mflog 'Failed to download code package from %s " + "after 6 tries. Exiting...' && exit 1; " + "fi" % code_package_url, + ] + + MetaflowPackage.get_extract_commands( + code_package_metadata, "job.tar", dest_dir="." + ) + + [ + "export %s=%s:$(printenv %s)" % (k, v.replace('"', '\\"'), k) + for k, v in MetaflowPackage.get_post_extract_env_vars( + code_package_metadata, dest_dir="." + ).items() + ] + + [ + "mflog 'Task is starting.'", + "flush_mflogs", + ] + ) return cmds def get_environment_info(self, include_ext_info=False): diff --git a/metaflow/metaflow_version.py b/metaflow/metaflow_version.py index 9f47444de24..90badf33657 100644 --- a/metaflow/metaflow_version.py +++ b/metaflow/metaflow_version.py @@ -11,7 +11,7 @@ from os import path, name, environ, listdir from metaflow.extension_support import update_package_info -from metaflow.info_file import CURRENT_DIRECTORY, read_info_file +from metaflow.meta_files import read_info_file # True/False correspond to the value `public`` in get_version diff --git a/metaflow/package.py b/metaflow/package.py deleted file mode 100644 index 1385883d5a7..00000000000 --- a/metaflow/package.py +++ /dev/null @@ -1,203 +0,0 @@ -import importlib -import os -import sys -import tarfile -import time -import json -from io import BytesIO - -from .user_configs.config_parameters import CONFIG_FILE, dump_config_values -from .extension_support import EXT_PKG, package_mfext_all -from .metaflow_config import DEFAULT_PACKAGE_SUFFIXES -from .exception import MetaflowException -from .util import to_unicode -from . import R -from .info_file import INFO_FILE - -DEFAULT_SUFFIXES_LIST = DEFAULT_PACKAGE_SUFFIXES.split(",") -METAFLOW_SUFFIXES_LIST = [".py", ".html", ".css", ".js"] - - -class NonUniqueFileNameToFilePathMappingException(MetaflowException): - headline = "Non Unique file path for a file name included in code package" - - def __init__(self, filename, file_paths, lineno=None): - msg = ( - "Filename %s included in the code package includes multiple different paths for the same name : %s.\n" - "The `filename` in the `add_to_package` decorator hook requires a unique `file_path` to `file_name` mapping" - % (filename, ", ".join(file_paths)) - ) - super().__init__(msg=msg, lineno=lineno) - - -# this is os.walk(follow_symlinks=True) with cycle detection -def walk_without_cycles(top_root): - seen = set() - - def _recurse(root): - for parent, dirs, files in os.walk(root): - for d in dirs: - path = os.path.join(parent, d) - if os.path.islink(path): - # Breaking loops: never follow the same symlink twice - # - # NOTE: this also means that links to sibling links are - # not followed. In this case: - # - # x -> y - # y -> oo - # oo/real_file - # - # real_file is only included twice, not three times - reallink = os.path.realpath(path) - if reallink not in seen: - seen.add(reallink) - for x in _recurse(path): - yield x - yield parent, files - - for x in _recurse(top_root): - yield x - - -class MetaflowPackage(object): - def __init__(self, flow, environment, echo, suffixes=DEFAULT_SUFFIXES_LIST): - self.suffixes = list(set().union(suffixes, DEFAULT_SUFFIXES_LIST)) - self.environment = environment - self.metaflow_root = os.path.dirname(__file__) - - self.flow_name = flow.name - self._flow = flow - self.create_time = time.time() - environment.init_environment(echo) - for step in flow: - for deco in step.decorators: - deco.package_init(flow, step.__name__, environment) - self.blob = self._make() - - def _walk(self, root, exclude_hidden=True, suffixes=None): - if suffixes is None: - suffixes = [] - root = to_unicode(root) # handle files/folder with non ascii chars - prefixlen = len("%s/" % os.path.dirname(root)) - for ( - path, - files, - ) in walk_without_cycles(root): - if exclude_hidden and "/." in path: - continue - # path = path[2:] # strip the ./ prefix - # if path and (path[0] == '.' or './' in path): - # continue - for fname in files: - if (fname[0] == "." and fname in suffixes) or ( - fname[0] != "." - and any(fname.endswith(suffix) for suffix in suffixes) - ): - p = os.path.join(path, fname) - yield p, p[prefixlen:] - - def path_tuples(self): - """ - Returns list of (path, arcname) to be added to the job package, where - `arcname` is the alternative name for the file in the package. - """ - # We want the following contents in the tarball - # Metaflow package itself - for path_tuple in self._walk( - self.metaflow_root, exclude_hidden=False, suffixes=METAFLOW_SUFFIXES_LIST - ): - yield path_tuple - - # Metaflow extensions; for now, we package *all* extensions but this may change - # at a later date; it is possible to call `package_mfext_package` instead of - # `package_mfext_all` but in that case, make sure to also add a - # metaflow_extensions/__init__.py file to properly "close" the metaflow_extensions - # package and prevent other extensions from being loaded that may be - # present in the rest of the system - for path_tuple in package_mfext_all(): - yield path_tuple - - # Any custom packages exposed via decorators - deco_module_paths = {} - for step in self._flow: - for deco in step.decorators: - for path_tuple in deco.add_to_package(): - file_path, file_name = path_tuple - # Check if the path is not duplicated as - # many steps can have the same packages being imported - if file_name not in deco_module_paths: - deco_module_paths[file_name] = file_path - yield path_tuple - elif deco_module_paths[file_name] != file_path: - raise NonUniqueFileNameToFilePathMappingException( - file_name, [deco_module_paths[file_name], file_path] - ) - - # the package folders for environment - for path_tuple in self.environment.add_to_package(): - yield path_tuple - if R.use_r(): - # the R working directory - for path_tuple in self._walk( - "%s/" % R.working_dir(), suffixes=self.suffixes - ): - yield path_tuple - # the R package - for path_tuple in R.package_paths(): - yield path_tuple - else: - # the user's working directory - flowdir = os.path.dirname(os.path.abspath(sys.argv[0])) + "/" - for path_tuple in self._walk(flowdir, suffixes=self.suffixes): - yield path_tuple - - def _add_configs(self, tar): - buf = BytesIO() - buf.write(json.dumps(dump_config_values(self._flow)).encode("utf-8")) - self._add_file(tar, os.path.basename(CONFIG_FILE), buf) - - def _add_info(self, tar): - buf = BytesIO() - buf.write( - json.dumps( - self.environment.get_environment_info(include_ext_info=True) - ).encode("utf-8") - ) - self._add_file(tar, os.path.basename(INFO_FILE), buf) - - @staticmethod - def _add_file(tar, filename, buf): - info = tarfile.TarInfo(filename) - buf.seek(0) - info.size = len(buf.getvalue()) - # Setting this default to Dec 3, 2019 - info.mtime = 1575360000 - tar.addfile(info, buf) - - def _make(self): - def no_mtime(tarinfo): - # a modification time change should not change the hash of - # the package. Only content modifications will. - # Setting this default to Dec 3, 2019 - tarinfo.mtime = 1575360000 - return tarinfo - - buf = BytesIO() - with tarfile.open( - fileobj=buf, mode="w:gz", compresslevel=3, dereference=True - ) as tar: - self._add_info(tar) - self._add_configs(tar) - for path, arcname in self.path_tuples(): - tar.add(path, arcname=arcname, recursive=False, filter=no_mtime) - - blob = bytearray(buf.getvalue()) - blob[4:8] = [0] * 4 # Reset 4 bytes from offset 4 to account for ts - return blob - - def __str__(self): - return "" % ( - self.flow_name, - time.strftime("%a, %d %b %Y %H:%M:%S", self.create_time), - ) diff --git a/metaflow/package/__init__.py b/metaflow/package/__init__.py new file mode 100644 index 00000000000..2843f4b1850 --- /dev/null +++ b/metaflow/package/__init__.py @@ -0,0 +1,646 @@ +import json +import os +import sys +import threading +import time + +from io import BytesIO +from types import ModuleType +from typing import Callable, Dict, List, Optional, TYPE_CHECKING, Type, cast + +from ..debug import debug +from ..packaging_sys import ContentType, MFContent +from ..packaging_sys.backend import PackagingBackend +from ..packaging_sys.tar_backend import TarPackagingBackend +from ..packaging_sys.v1 import MFContentV1 +from ..packaging_sys.utils import suffix_filter, walk +from ..metaflow_config import DEFAULT_PACKAGE_SUFFIXES +from ..exception import MetaflowException +from ..user_configs.config_parameters import dump_config_values +from ..util import get_metaflow_root +from .. import R + +DEFAULT_SUFFIXES_LIST = DEFAULT_PACKAGE_SUFFIXES.split(",") + + +if TYPE_CHECKING: + import metaflow.datastore + + +class NonUniqueFileNameToFilePathMappingException(MetaflowException): + headline = "Non-unique file path for a file name included in code package" + + def __init__(self, filename, file_paths, lineno=None): + msg = ( + "Filename %s included in the code package includes multiple different " + "paths for the same name : %s.\n" + "The `filename` in the `add_to_package` decorator hook requires a unique " + "`file_path` to `file_name` mapping" % (filename, ", ".join(file_paths)) + ) + super().__init__(msg=msg, lineno=lineno) + + +class MetaflowPackage(object): + def __init__( + self, + flow, + environment, + echo, + suffixes: Optional[List[str]] = DEFAULT_SUFFIXES_LIST, + user_code_filter: Optional[Callable[[str], bool]] = None, + flow_datastore: Optional["metaflow.datastore.FlowDataStore"] = None, + mfcontent: Optional[MFContent] = None, + exclude_tl_dirs=None, + backend: Type[PackagingBackend] = TarPackagingBackend, + ): + self._environment = environment + self._environment.init_environment(echo) + + self._echo = echo + self._flow = flow + self._flow_datastore = flow_datastore + self._backend = backend + + # Info about the package + self._name = None + self._create_time = time.time() + self._user_flow_dir = None + + # Content of the package (and settings on how to create it) + if suffixes is not None: + self._suffixes = list(set().union(suffixes, DEFAULT_SUFFIXES_LIST)) + else: + self._suffixes = None + + if mfcontent is None: + self._mfcontent = MFContentV1( + criteria=lambda x: hasattr(x, "METAFLOW_PACKAGE"), + ) + else: + self._mfcontent = mfcontent + # We exclude the environment when packaging as this will be packaged separately. + # This comes into play primarily if packaging from a node already running packaged + # code. + # These directories are only excluded at the top-level (ie: not further down + # in sub-directories) + # "_escape_trampolines" is a special directory where trampoline escape hatch + # files are stored (used by Netflix Extension's Conda implementation). + self._exclude_tl_dirs = ( + self._mfcontent.get_excluded_tl_entries() + + ["_escape_trampolines"] + + (exclude_tl_dirs or []) + ) + + if self._suffixes is not None and user_code_filter is not None: + self._user_code_filter = lambda x, f1=suffix_filter( + self._suffixes + ), f2=user_code_filter: f1(x) and f2(x) + self._filter_type = "suffixes and user filter" + elif self._suffixes is not None: + self._user_code_filter = suffix_filter(self._suffixes) + self._filter_type = "suffixes" + elif user_code_filter is not None: + self._user_code_filter = user_code_filter + self._filter_type = "user filter" + else: + self._user_code_filter = lambda x: True + self._filter_type = "no filter" + + # Info about the package creation (it happens async) + self._is_package_available = None + self._blob_sha = None + self._blob_url = None + self._blob = None + + # We launch a thread to create the package asynchronously and upload + # it opportunistically + self._create_thread = threading.Thread( + target=self._package_and_upload, + daemon=True, + ) + self._create_thread.start() + + def blob(self, timeout: Optional[float] = None) -> BytesIO: + if self._blob is None: + self._create_thread.join(timeout) + if self._is_package_available is not None: + # We have our result now + if self._is_package_available: + return self._blob + else: + raise self._packaging_exception + return self._blob + + def package_sha(self, timeout: Optional[float] = None) -> Optional[str]: + if self._blob_sha is None: + self._create_thread.join(timeout) + if self._is_package_available is not None: + # We have our result now + if self._is_package_available: + return self._blob_sha + else: + raise self._packaging_exception + return self._blob_sha + + def package_url(self, timeout: Optional[float] = None) -> Optional[str]: + if self._blob_url is None: + self._create_thread.join(timeout) + if self._is_package_available is not None: + # We have our result now + if self._is_package_available: + return self._blob_url + else: + raise self._packaging_exception + return self._blob_url + + @property + def package_metadata(self): + return json.dumps( + { + "version": 0, + "backend": self._backend.backend_type, + "mfcontent_version": self._mfcontent.get_package_version(), + } + ) + + @classmethod + def get_backend(cls, pkg_metadata: str) -> PackagingBackend: + """ + Method to get the backend type from the package metadata. + + Parameters + ---------- + pkg_metadata : str + The metadata of the package to extract. + + Returns + ------- + PackagingBackend + The backend type that can be used to extract the package. + """ + backend_type = json.loads(pkg_metadata).get("backend", "tgz") + return PackagingBackend.get_backend(backend_type) + + @classmethod + def get_extract_commands( + cls, pkg_metadata: str, archive_path: str, dest_dir: str = "." + ) -> List[str]: + """ + Method to get the commands needed to extract the package into + the directory dest_dir. Note that this will return a list of commands + that can be passed to subprocess.run for example. + + Parameters + ---------- + pkg_metadata : str + The metadata of the package to extract. + archive_path : str + The path to the archive to extract. + dest_dir : str, default "." + The directory to extract the package into. + + Returns + ------- + List[str] + The commands needed to extract the package into the directory dest_dir. + """ + backend_type = json.loads(pkg_metadata).get("backend", "tgz") + # We now ask the backend type how to extract itself + backend = PackagingBackend.get_backend(backend_type) + cmds = backend.get_extract_commands(archive_path, dest_dir) + debug.package_exec(f"Command to extract {archive_path} into {dest_dir}: {cmds}") + return cmds + + @classmethod + def get_post_extract_env_vars( + cls, pkg_metadata: str, dest_dir: str = "." + ) -> Dict[str, str]: + """ + Method to get the environment variables needed to access the content + that has been extracted into the directory dest_dir. This will + typically involve setting PYTHONPATH + + Parameters + ---------- + pkg_metadata : str + The metadata of the package to extract. + dest_dir : str, default "." + The directory where the content has been extracted to. + + Returns + ------- + Dict[str, str] + The post-extract environment variables that are needed to access the content + that has been extracted into dest_dir. + """ + mfcontent_version = json.loads(pkg_metadata).get("mfcontent_version", 0) + env_vars = MFContent.get_post_extract_env_vars(mfcontent_version, dest_dir) + debug.package_exec( + f"Environment variables to access content extracted into {dest_dir}: {env_vars}" + ) + return env_vars + + @classmethod + def cls_get_content( + cls, pkg_metadata, archive: BytesIO, name: str + ) -> Optional[bytes]: + """ + Method to get the content of a member in the package archive. + + Parameters + ---------- + pkg_metadata : str + The metadata of the package to extract. + archive : BytesIO + The archive to extract the member from. + name : str + The name of the member to extract. + + Returns + ------- + Optional[bytes] + The content of the member if it exists, None otherwise. + """ + backend = cls.get_backend(pkg_metadata) + with backend.cls_open(archive) as opened_archive: + return backend.cls_get_member(opened_archive, name) + + @classmethod + def cls_get_info(cls, pkg_metadata, archive: BytesIO) -> Optional[Dict[str, str]]: + """ + Method to get the info of the package from the archive. + Parameters + ---------- + pkg_metadata : str + The metadata of the package to extract. + archive : BytesIO + The archive to extract the info from. + Returns + ------- + Optional[Dict[str, str]] + The info of the package if it exists, None otherwise. + """ + backend = cls.get_backend(pkg_metadata) + with backend.cls_open(archive) as opened_archive: + return MFContent.get_archive_info(opened_archive, backend) + + @classmethod + def cls_get_config( + cls, pkg_metadata: str, archive: BytesIO + ) -> Optional[Dict[str, str]]: + """ + Method to get the config of the package from the archive. + + Parameters + ---------- + pkg_metadata : str + The metadata of the package to extract. + archive : BytesIO + The archive to extract the config from. + + Returns + ------- + Optional[Dict[str, str]] + The config of the package if it exists, None otherwise. + """ + backend = cls.get_backend(pkg_metadata) + with backend.cls_open(archive) as opened_archive: + return MFContent.get_archive_config(opened_archive, backend) + + @classmethod + def cls_extract_into( + cls, + pkg_metadata: str, + archive: BytesIO, + dest_dir: str = ".", + content_types: int = ContentType.ALL_CONTENT.value, + ): + """ + Method to extract the package archive into a directory. + + Parameters + ---------- + pkg_metadata : str + The metadata of the package to extract. + archive : BytesIO + The archive to extract. + dest_dir : str, default "." + The directory to extract the package into. + content_types : int, default ALL_CONTENT + The types of content to extract. This is a bitmask of ContentType values. + """ + backend = cls.get_backend(pkg_metadata) + with backend.cls_open(archive) as opened_archive: + include_names = MFContent.get_archive_content_names( + opened_archive, content_types, backend + ) + backend.extract_members(include_names, dest_dir) + + def user_tuples(self, timeout: Optional[float] = None): + # Wait for at least the blob to be formed + _ = self.blob(timeout=timeout) + for path, arcname in self._cached_user_members: + yield path, arcname + + def path_tuples(self, timeout: Optional[float] = None): + # Wait for at least the blob to be formed + _ = self.blob(timeout=timeout) + # Files included in the environment + yield from self._mfcontent.content_names() + + # Files included in the user code + yield from self.user_tuples() + + def show(self, timeout: Optional[float] = None) -> str: + # Human-readable content of the package + blob = self.blob(timeout=timeout) # Ensure the package is created + lines = [ + f"Package size: {self._format_size(len(blob))}", + f"Number of files: {sum(1 for _ in self.path_tuples())}", + self._mfcontent.show(), + ] + + if self._flow: + lines.append(f"\nUser code in flow {self._name}:") + lines.append(f" - Packaged from directory {self._user_flow_dir}") + if self._filter_type != "no filter": + if self._suffixes: + lines.append( + f" - Filtered by suffixes: {', '.join(self._suffixes)}" + ) + else: + lines.append(f" - Filtered by {self._filter_type}") + else: + lines.append(" - No user code filter applied") + if self._exclude_tl_dirs: + lines.append( + f" - Excluded directories: {', '.join(self._exclude_tl_dirs)}" + ) + return "\n".join(lines) + + def get_content( + self, name: str, content_type: ContentType, timeout: Optional[float] = None + ) -> Optional[bytes]: + """ + Method to get the content of a file within the package. This method + should be used for one-off access to small-ish files. If more files are + needed, use extract_into to extract the package into a directory and + then access the files from there. + + Parameters + ---------- + name : str + The name of the file to get the content of. Note that this + is not necessarily the name in the archive but is the name + that was passed in when creating the archive (in the archive, + it may be prefixed by some directory structure). + content_type : ContentType + The type of file to get the content of. + + Returns + ------- + Optional[bytes] + The content of the file. If the file is not found, None is returned. + """ + # Wait for at least the blob to be formed + _ = self.blob(timeout=timeout) + if content_type == ContentType.USER_CONTENT: + for path, arcname in self.user_tuples(): + if name == arcname: + return open(path, "rb").read() + return None + elif content_type in ( + ContentType.CODE_CONTENT, + ContentType.MODULE_CONTENT, + ContentType.OTHER_CONTENT, + ): + mangled_name = self._mfcontent.get_archive_filename(name, content_type) + for path_or_bytes, arcname in self._mfcontent.contents(content_type): + if mangled_name == arcname: + if isinstance(path_or_bytes, bytes): + # In case this is generated content like an INFO file + return path_or_bytes + # Otherwise, it is a file path + return open(path_or_bytes, "rb").read() + return None + raise ValueError(f"Unknown content type: {content_type}") + + def extract_into( + self, + dest_dir: str = ".", + content_types: int = ContentType.ALL_CONTENT.value, + timeout: Optional[float] = None, + ): + """ + Method to extract the package (or some of the files) into a directory. + + Parameters + ---------- + dest_dir : str, default "." + The directory to extract the package into. + content_types : int, default ALL_CONTENT + The types of content to extract. + """ + _ = self.blob(timeout=timeout) # Ensure the package is created + member_list = [] + if content_types & ContentType.USER_CONTENT.value: + member_list.extend( + [(m[0], os.path.join(dest_dir, m[1])) for m in self.user_tuples()] + ) + if content_types & ( + ContentType.CODE_CONTENT.value | ContentType.MODULE_CONTENT.value + ): + # We need to get the name of the files in the content archive to extract + member_list.extend( + [ + (m[0], os.path.join(dest_dir, m[1])) + for m in self._mfcontent.content_names( + content_types & ~ContentType.OTHER_CONTENT.value + ) + ] + ) + for orig_path, new_path in member_list: + os.makedirs(os.path.dirname(new_path), exist_ok=True) + # TODO: In case there are duplicate files -- that should not be the case + # but there is a bug currently with internal Netflix code. + if not os.path.exists(new_path): + os.symlink(orig_path, new_path) + # Could copy files as well if we want to split them out. + # shutil.copy(orig_path, new_path) + # OTHER_CONTENT requires special handling because sometimes the file isn't a file + # but generated content + member_list = [] + if content_types & ContentType.OTHER_CONTENT.value: + member_list.extend( + [ + (m[0], os.path.join(dest_dir, m[1])) + for m in self._mfcontent.contents(ContentType.OTHER_CONTENT) + ] + ) + for path_or_content, new_path in member_list: + os.makedirs(os.path.dirname(new_path), exist_ok=True) + if not os.path.exists(new_path): + if isinstance(path_or_content, bytes): + with open(new_path, "wb") as f: + f.write(path_or_content) + else: + os.symlink(path_or_content, new_path) + + @staticmethod + def _format_size(size_in_bytes): + for unit in ["B", "KB", "MB", "GB", "TB"]: + if size_in_bytes < 1024.0: + return f"{size_in_bytes:.2f} {unit}" + size_in_bytes /= 1024.0 + return f"{size_in_bytes:.2f} PB" + + def _package_and_upload(self): + try: + # Can be called without a flow (Function) + if self._flow: + for step in self._flow: + for deco in step.decorators: + deco.package_init(self._flow, step.__name__, self._environment) + self._name = f"flow {self._flow.name}" + else: + self._name = "" + + # Add metacontent + self._mfcontent.add_info( + self._environment.get_environment_info(include_ext_info=True) + ) + + self._mfcontent.add_config(dump_config_values(self._flow)) + + # Add user files (from decorators and environment) + if self._flow: + self._add_addl_files() + self._cached_user_members = list(self._user_code_tuples()) + debug.package_exec( + f"User files to package: {self._cached_user_members}" + ) + + self._blob = self._make() + + if self._flow_datastore: + if len(self._blob) > 100 * 1024 * 1024: + self._echo( + f"Warning: The code package for {self._flow.name} is larger than " + f"100MB (found it to be {self._format_size(len(self._blob))}) " + "This may lead to slower upload times for remote runs and no " + "uploads for local runs. Consider reducing the package size. " + "Use ` package info` or ` package list` " + "to get more information about what is included in the package." + ) + self._blob_url, self._blob_sha = self._flow_datastore.save_data( + [self._blob], len_hint=1 + )[0] + else: + self._blob_url = self._blob_sha = "" + self._is_package_available = True + except Exception as e: + self._packaging_exception = e + self._echo(f"Package creation/upload failed for {self._flow.name}: {e}") + self._is_package_available = False + + def _add_addl_files(self): + # Look at all decorators that provide additional files + deco_module_paths = {} + addl_modules = set() + + def _check_tuple(path_tuple): + if len(path_tuple) == 2: + path_tuple = ( + path_tuple[0], + path_tuple[1], + ContentType.CODE_CONTENT, + ) + file_path, file_name, file_type = path_tuple + if file_type == ContentType.MODULE_CONTENT: + if file_path in addl_modules: + return None # Module was already added -- we don't add twice + addl_modules.add(file_path) + elif file_type in ( + ContentType.OTHER_CONTENT, + ContentType.CODE_CONTENT, + ): + path_tuple = (os.path.realpath(path_tuple[0]), path_tuple[1], file_type) + # These are files + # Check if the path is not duplicated as + # many steps can have the same packages being imported + if file_name not in deco_module_paths: + deco_module_paths[file_name] = file_path + elif deco_module_paths[file_name] != file_path: + raise NonUniqueFileNameToFilePathMappingException( + file_name, [deco_module_paths[file_name], file_path] + ) + else: + raise ValueError(f"Unknown file type: {file_type}") + return path_tuple + + def _add_tuple(path_tuple): + file_path, file_name, file_type = path_tuple + if file_type == ContentType.MODULE_CONTENT: + # file_path is actually a module + self._mfcontent.add_module(cast(ModuleType, file_path)) + elif file_type == ContentType.CODE_CONTENT: + self._mfcontent.add_code_file(file_path, file_name) + elif file_type == ContentType.OTHER_CONTENT: + self._mfcontent.add_other_file(file_path, file_name) + + for step in self._flow: + for deco in step.decorators: + for path_tuple in deco.add_to_package(): + path_tuple = _check_tuple(path_tuple) + if path_tuple is None: + continue + _add_tuple(path_tuple) + + # the package folders for environment + for path_tuple in self._environment.add_to_package(): + path_tuple = _check_tuple(path_tuple) + if path_tuple is None: + continue + _add_tuple(path_tuple) + + def _user_code_tuples(self): + if R.use_r(): + # the R working directory + self._user_flow_dir = R.working_dir() + for path_tuple in walk( + "%s/" % R.working_dir(), file_filter=self._user_code_filter + ): + yield path_tuple + # the R package + for path_tuple in R.package_paths(): + yield path_tuple + else: + # the user's working directory + flowdir = os.path.dirname(os.path.abspath(sys.argv[0])) + "/" + self._user_flow_dir = flowdir + for path_tuple in walk( + flowdir, + file_filter=self._user_code_filter, + exclude_tl_dirs=self._exclude_tl_dirs, + ): + # TODO: This is where we will check if the file is already included + # in the mfcontent portion + yield path_tuple + + def _make(self): + backend = self._backend() + with backend.create() as archive: + # Package the environment + for path_or_bytes, arcname in self._mfcontent.contents(): + if isinstance(path_or_bytes, str): + archive.add_file(path_or_bytes, arcname=arcname) + else: + archive.add_data(BytesIO(path_or_bytes), arcname=arcname) + + # Package the user code + for path, arcname in self._cached_user_members: + archive.add_file(path, arcname=arcname) + return backend.get_blob() + + def __str__(self): + return f"" diff --git a/metaflow/packaging_sys/__init__.py b/metaflow/packaging_sys/__init__.py new file mode 100644 index 00000000000..051dde566f6 --- /dev/null +++ b/metaflow/packaging_sys/__init__.py @@ -0,0 +1,846 @@ +import json +import os + +from enum import IntEnum +from types import ModuleType +from typing import ( + Any, + Dict, + Generator, + List, + Optional, + TYPE_CHECKING, + Tuple, + Type, + Union, +) + +from metaflow.packaging_sys.distribution_support import PackagedDistributionFinder + + +from .backend import PackagingBackend +from .tar_backend import TarPackagingBackend + +from ..util import get_metaflow_root + +MFCONTENT_MARKER = ".mf_install" + +if TYPE_CHECKING: + import metaflow.extension_support.metadata + + +class ContentType(IntEnum): + USER_CONTENT = ( + 0x1 # File being added is user code (ie: the directory with the flow file) + ) + CODE_CONTENT = ( + 0x2 # File being added is non-user code (libraries, metaflow itself, ...) + ) + MODULE_CONTENT = 0x4 # File being added is a python module + OTHER_CONTENT = 0x8 # File being added is a non-python file + + ALL_CONTENT = USER_CONTENT | CODE_CONTENT | MODULE_CONTENT | OTHER_CONTENT + + +class MFContent: + """ + Base class for all Metaflow code packages (non user code). + + A Metaflow code package, at a minimum, contains: + - a special INFO file (containing a bunch of metadata about the Metaflow environment) + - a special CONFIG file (containing user configurations for the flow) + + Declare all other MFContent subclasses (versions) here to handle just the functions + that are not implemented here. In a *separate* file, declare any other + function for that specific version. + + NOTE: This file must remain as dependency-free as possible as it is loaded *very* + early on. This is why you must decleare a *separate* class implementing what you want + the Metaflow code package (non user) to do. + """ + + _cached_mfcontent_info = {} + + _mappings = {} + + @classmethod + def get_info(cls) -> Optional[Dict[str, Any]]: + """ + Get the content of the special INFO file on the local filesystem after + the code package has been expanded. + + Returns + ------- + Optional[Dict[str, Any]] + The content of the INFO file -- None if there is no such file. + """ + mfcontent_info = cls._extract_mfcontent_info() + handling_cls = cls._get_mfcontent_class(mfcontent_info) + return handling_cls.get_info_impl(mfcontent_info) + + @classmethod + def get_config(cls) -> Optional[Dict[str, Any]]: + """ + Get the content of the special CONFIG file on the local filesystem after + the code package has been expanded. + + Returns + ------- + Optional[Dict[str, Any]] + The content of the CONFIG file -- None if there is no such file. + """ + mfcontent_info = cls._extract_mfcontent_info() + handling_cls = cls._get_mfcontent_class(mfcontent_info) + return handling_cls.get_config_impl(mfcontent_info) + + @classmethod + def get_filename(cls, filename: str, content_type: ContentType) -> Optional[str]: + """ + Get the path to a file extracted from the archive. The filename is the filename + passed in when creating the archive and content_type is the type of the content. + + This function will return the local path where the file can be found after + the package has been extracted. + + Parameters + ---------- + filename: str + The name of the file on the filesystem. + content_type: ContentType + + Returns + ------- + str + The path to the file on the local filesystem or None if not found. + """ + mfcontent_info = cls._extract_mfcontent_info() + handling_cls = cls._get_mfcontent_class(mfcontent_info) + return handling_cls.get_filename_impl(mfcontent_info, filename, content_type) + + @classmethod + def get_archive_info( + cls, + archive: Any, + packaging_backend: Type[PackagingBackend] = TarPackagingBackend, + ) -> Optional[Dict[str, Any]]: + """ + Get the content of the special INFO file in the archive. + + Returns + ------- + Optional[Dict[str, Any]] + The content of the INFO file -- None if there is no such file. + """ + mfcontent_info = cls._extract_archive_mfcontent_info(archive, packaging_backend) + handling_cls = cls._get_mfcontent_class(mfcontent_info) + return handling_cls.get_archive_info_impl( + mfcontent_info, archive, packaging_backend + ) + + @classmethod + def get_archive_config( + cls, + archive: Any, + packaging_backend: Type[PackagingBackend] = TarPackagingBackend, + ) -> Optional[Dict[str, Any]]: + """ + Get the content of the special CONFIG file in the archive. + + Returns + ------- + Optional[Dict[str, Any]] + The content of the CONFIG file -- None if there is no such file. + """ + mfcontent_info = cls._extract_archive_mfcontent_info(archive, packaging_backend) + handling_cls = cls._get_mfcontent_class(mfcontent_info) + return handling_cls.get_archive_config_impl( + mfcontent_info, archive, packaging_backend + ) + + @classmethod + def get_archive_filename( + cls, + archive: Any, + filename: str, + content_type: ContentType, + packaging_backend: Type[PackagingBackend] = TarPackagingBackend, + ) -> Optional[str]: + """ + Get the filename of the archive. This does not do any extraction but simply + returns where, in the archive, the file is located. This is the equivalent of + get_filename but for files not extracted yet. + + Parameters + ---------- + archive: Any + The archive to get the filename from. + filename: str + The name of the file in the archive. + content_type: ContentType + The type of the content (e.g., code, other, etc.). + packaging_backend: Type[PackagingBackend], default TarPackagingBackend + The packaging backend to use. + + Returns + ------- + str + The filename of the archive or None if not found. + """ + mfcontent_info = cls._extract_archive_mfcontent_info(archive, packaging_backend) + handling_cls = cls._get_mfcontent_class(mfcontent_info) + return handling_cls.get_archive_filename_impl( + mfcontent_info, archive, filename, content_type, packaging_backend + ) + + @classmethod + def get_archive_content_names( + cls, + archive: Any, + content_types: Optional[int] = None, + packaging_backend: Type[PackagingBackend] = TarPackagingBackend, + ) -> List[str]: + mfcontent_info = cls._extract_archive_mfcontent_info(archive, packaging_backend) + handling_cls = cls._get_mfcontent_class(mfcontent_info) + return handling_cls.get_archive_content_names_impl( + mfcontent_info, archive, content_types, packaging_backend + ) + + @classmethod + def get_distribution_finder( + cls, + ) -> Optional["metaflow.extension_support.metadata.DistributionFinder"]: + """ + Get the distribution finder for the Metaflow code package (if applicable). + + Some packages will include distribution information to "pretend" that some packages + are actually distributions even if we just include them in the code package. + + Returns + ------- + Optional["metaflow.extension_support.metadata.DistributionFinder"] + The distribution finder for the Metaflow code package -- None if there is no + such finder. + """ + mfcontent_info = cls._extract_mfcontent_info() + handling_cls = cls._get_mfcontent_class(mfcontent_info) + return handling_cls.get_distribution_finder_impl(mfcontent_info) + + @classmethod + def get_post_extract_env_vars( + cls, version_id: int, dest_dir: str = "." + ) -> Dict[str, str]: + """ + Get the post-extract environment variables that are needed to access the content + that has been extracted into dest_dir. + + This will typically involve setting PYTHONPATH. + + Parameters + ---------- + version_id: int + The version of MFContent for this package. + dest_dir: str, default "." + The directory where the content has been extracted to. + + Returns + ------- + Dict[str, str] + The post-extract environment variables that are needed to access the content + that has been extracted into extracted_dir. + """ + if version_id not in cls._mappings: + raise ValueError( + "Invalid package -- unknown version %s in info: %s" + % (version_id, cls._mappings) + ) + return cls._mappings[version_id].get_post_extract_env_vars_impl(dest_dir) + + # Implement the _impl methods in the base subclass (in this file). These need to + # happen with as few imports as possible to prevent circular dependencies. + @classmethod + def get_info_impl( + cls, mfcontent_info: Optional[Dict[str, Any]] + ) -> Optional[Dict[str, Any]]: + raise NotImplementedError("get_info_impl not implemented") + + @classmethod + def get_config_impl( + cls, mfcontent_info: Optional[Dict[str, Any]] + ) -> Optional[Dict[str, Any]]: + raise NotImplementedError("get_config_impl not implemented") + + @classmethod + def get_filename_impl( + cls, + mfcontent_info: Optional[Dict[str, Any]], + filename: str, + content_type: ContentType, + ) -> Optional[str]: + raise NotImplementedError("get_filename_impl not implemented") + + @classmethod + def get_distribution_finder_impl( + cls, mfcontent_info: Optional[Dict[str, Any]] + ) -> Optional["metaflow.extension_support.metadata.DistributionFinder"]: + raise NotImplementedError("get_distribution_finder_impl not implemented") + + @classmethod + def get_archive_info_impl( + cls, + mfcontent_info: Optional[Dict[str, Any]], + archive: Any, + packaging_backend: Type[PackagingBackend] = TarPackagingBackend, + ) -> Optional[Dict[str, Any]]: + raise NotImplementedError("get_archive_info_impl not implemented") + + @classmethod + def get_archive_config_impl( + cls, + mfcontent_info: Optional[Dict[str, Any]], + archive: Any, + packaging_backend: Type[PackagingBackend] = TarPackagingBackend, + ) -> Optional[Dict[str, Any]]: + raise NotImplementedError("get_archive_config_impl not implemented") + + @classmethod + def get_archive_filename_impl( + cls, + mfcontent_info: Optional[Dict[str, Any]], + archive: Any, + filename: str, + content_type: ContentType, + packaging_backend: Type[PackagingBackend] = TarPackagingBackend, + ) -> Optional[str]: + raise NotImplementedError("get_archive_filename_impl not implemented") + + @classmethod + def get_archive_content_names_impl( + cls, + mfcontent_info: Optional[Dict[str, Any]], + archive: Any, + content_types: Optional[int] = None, + packaging_backend: Type[PackagingBackend] = TarPackagingBackend, + ) -> List[str]: + raise NotImplementedError("get_archive_content_names_impl not implemented") + + @classmethod + def get_post_extract_env_vars_impl(cls, dest_dir: str) -> Dict[str, str]: + raise NotImplementedError("get_post_extract_env_vars_impl not implemented") + + def __init_subclass__(cls, version_id, **kwargs) -> None: + super().__init_subclass__(**kwargs) + if version_id in MFContent._mappings: + raise ValueError( + "Version ID %s already exists in MFContent mappings " + "-- this is a bug in Metaflow." % str(version_id) + ) + MFContent._mappings[version_id] = cls + cls._version_id = version_id + + # Implement these methods in sub-classes of the base sub-classes. These methods + # are called later and can have more dependencies and so can live in other files. + def get_excluded_tl_entries(self) -> List[str]: + """ + When packaging Metaflow from within an executing Metaflow flow, we need to + exclude the files that are inserted by this content from being packaged (possibly). + + Use this function to return these files or top-level directories. + + Returns + ------- + List[str] + Files or directories to exclude + """ + return [] + + def content_names( + self, content_types: Optional[int] = None + ) -> Generator[Tuple[str, str], None, None]: + """ + Detailed list of the content of this MFContent. This will list all files + (or non files -- for the INFO or CONFIG data for example) present in the archive. + + Parameters + ---------- + content_types : Optional[int] + The type of content to get the names of. If None, all content is returned. + + Yields + ------ + Generator[Tuple[str, str], None, None] + Path on the filesystem and the name in the archive + """ + raise NotImplementedError("content_names not implemented") + + def contents( + self, content_types: Optional[int] = None + ) -> Generator[Tuple[Union[bytes, str], str], None, None]: + """ + Very similar to content_names but returns the content of the non-files + as well as bytes. For files, identical output as content_names + + Parameters + ---------- + content_types : Optional[int] + The type of content to get the content of. If None, all content is returned. + + Yields + ------ + Generator[Tuple[Union[str, bytes], str], None, None] + Content of the MF content + """ + raise NotImplementedError("content not implemented") + + def show(self) -> str: + """ + Returns a more human-readable string representation of the content of this + MFContent. This will not, for example, list all files but summarize what + is included at a more high level. + + Returns + ------- + str + A human-readable string representation of the content of this MFContent + """ + raise NotImplementedError("show not implemented") + + def add_info(self, info: Dict[str, Any]) -> None: + """ + Add the content of the INFO file to the Metaflow content + + Parameters + ---------- + info: Dict[str, Any] + The content of the INFO file + """ + raise NotImplementedError("add_info not implemented") + + def add_config(self, config: Dict[str, Any]) -> None: + """ + Add the content of the CONFIG file to the Metaflow content + + Parameters + ---------- + config: Dict[str, Any] + The content of the CONFIG file + """ + raise NotImplementedError("add_config not implemented") + + def add_module(self, module_path: ModuleType) -> None: + """ + Add a python module to the Metaflow content + + Parameters + ---------- + module_path: ModuleType + The module to add + """ + raise NotImplementedError("add_module not implemented") + + def add_code_file(self, file_path: str, file_name: str) -> None: + """ + Add a code file to the Metaflow content + + Parameters + ---------- + file_path: str + The path to the code file to add (on the filesystem) + file_name: str + The path in the archive to add the code file to + """ + raise NotImplementedError("add_code_file not implemented") + + def add_other_file(self, file_path: str, file_name: str) -> None: + """ + Add a non-python file to the Metaflow content + + Parameters + ---------- + file_path: str + The path to the file to add (on the filesystem) + file_name: str + The path in the archive to add the file to + """ + raise NotImplementedError("add_other_file not implemented") + + @classmethod + def _get_mfcontent_class(cls, info: Optional[Dict[str, Any]]) -> Type["MFContent"]: + if info is None: + return MFContentV0 + if "version" not in info: + raise ValueError("Invalid package -- missing version in info: %s" % info) + version = info["version"] + if version not in cls._mappings: + raise ValueError( + "Invalid package -- unknown version %s in info: %s" % (version, info) + ) + + return cls._mappings[version] + + @classmethod + def _extract_archive_mfcontent_info( + cls, + archive: Any, + packaging_backend: Type[PackagingBackend] = TarPackagingBackend, + ) -> Optional[Dict[str, Any]]: + if id(archive) in cls._cached_mfcontent_info: + return cls._cached_mfcontent_info[id(archive)] + + mfcontent_info = None # type: Optional[Dict[str, Any]] + # Here we need to extract the information from the archive + if packaging_backend.cls_has_member(archive, MFCONTENT_MARKER): + # The MFCONTENT_MARKER file is present in the archive + # We can extract the information from it + extracted_info = packaging_backend.cls_get_member(archive, MFCONTENT_MARKER) + if extracted_info: + mfcontent_info = json.loads(extracted_info) + cls._cached_mfcontent_info[id(archive)] = mfcontent_info + return mfcontent_info + + @classmethod + def _extract_mfcontent_info(cls) -> Optional[Dict[str, Any]]: + if "_local" in cls._cached_mfcontent_info: + return cls._cached_mfcontent_info["_local"] + + mfcontent_info = None # type: Optional[Dict[str, Any]] + if os.path.exists(os.path.join(get_metaflow_root(), MFCONTENT_MARKER)): + with open( + os.path.join(get_metaflow_root(), MFCONTENT_MARKER), + "r", + encoding="utf-8", + ) as f: + mfcontent_info = json.load(f) + cls._cached_mfcontent_info["_local"] = mfcontent_info + return mfcontent_info + + def get_package_version(self) -> int: + """ + Get the version of MFContent for this package. + """ + # _version_id is set in __init_subclass__ when the subclass is created + return self._version_id + + +class MFContentV0(MFContent, version_id=0): + @classmethod + def get_info_impl( + cls, mfcontent_info: Optional[Dict[str, Any]] + ) -> Optional[Dict[str, Any]]: + path_to_file = os.path.join(get_metaflow_root(), "INFO") + if os.path.isfile(path_to_file): + with open(path_to_file, "r", encoding="utf-8") as f: + return json.load(f) + return None + + @classmethod + def get_config_impl( + cls, mfcontent_info: Optional[Dict[str, Any]] + ) -> Optional[Dict[str, Any]]: + path_to_file = os.path.join(get_metaflow_root(), "CONFIG") + if os.path.isfile(path_to_file): + with open(path_to_file, "r", encoding="utf-8") as f: + return json.load(f) + return None + + @classmethod + def get_filename_impl( + cls, + mfcontent_info: Optional[Dict[str, Any]], + filename: str, + content_type: ContentType, + ) -> Optional[str]: + """ + For V0, the filename is simply the filename passed in. + """ + path_to_file = os.path.join(get_metaflow_root(), filename) + if os.path.isfile(path_to_file): + return path_to_file + return None + + @classmethod + def get_distribution_finder_impl( + cls, mfcontent_info: Optional[Dict[str, Any]] + ) -> Optional["metaflow.extension_support.metadata.DistributionFinder"]: + return None + + @classmethod + def get_archive_info_impl( + cls, + mfcontent_info: Optional[Dict[str, Any]], + archive: Any, + packaging_backend: Type[PackagingBackend] = TarPackagingBackend, + ) -> Optional[Dict[str, Any]]: + info_content = packaging_backend.cls_get_member(archive, "INFO") + if info_content: + return json.loads(info_content) + return None + + @classmethod + def get_archive_config_impl( + cls, + mfcontent_info: Optional[Dict[str, Any]], + archive: Any, + packaging_backend: Type[PackagingBackend] = TarPackagingBackend, + ) -> Optional[Dict[str, Any]]: + info_content = packaging_backend.cls_get_member(archive, "CONFIG") + if info_content: + return json.loads(info_content) + return None + + @classmethod + def get_archive_filename_impl( + cls, + mfcontent_info: Optional[Dict[str, Any]], + archive: Any, + filename: str, + content_type: ContentType, + packaging_backend: Type[PackagingBackend] = TarPackagingBackend, + ) -> str: + if packaging_backend.cls_has_member(archive, filename): + # The file is present in the archive + return filename + return None + + @classmethod + def get_archive_content_names_impl( + cls, + mfcontent_info: Optional[Dict[str, Any]], + archive: Any, + content_types: Optional[int] = None, + packaging_backend: Type[PackagingBackend] = TarPackagingBackend, + ) -> List[str]: + """ + For V0, we use a static list of known files to classify the content + """ + known_prefixes = { + "metaflow/": ContentType.CODE_CONTENT.value, + "metaflow_extensions/": ContentType.CODE_CONTENT.value, + "INFO": ContentType.OTHER_CONTENT.value, + "CONFIG": ContentType.OTHER_CONTENT.value, + "conda.manifest": ContentType.OTHER_CONTENT.value, + "uv.lock": ContentType.OTHER_CONTENT.value, + "pyproject.toml": ContentType.OTHER_CONTENT.value, + # Used in nflx-metaflow-extensions + "condav2-1.cnd": ContentType.OTHER_CONTENT.value, + } + to_return = [] + for filename in packaging_backend.cls_list_members(archive): + for prefix, classification in known_prefixes.items(): + if ( + prefix[-1] == "/" and filename.startswith(prefix) + ) or prefix == filename: + if content_types & classification: + to_return.append(filename) + elif content_types & ContentType.USER_CONTENT.value: + # Everything else is user content + to_return.append(filename) + return to_return + + @classmethod + def get_post_extract_env_vars_impl(cls, dest_dir: str) -> Dict[str, str]: + return {"PYTHONPATH": dest_dir} + + def get_excluded_tl_entries(self) -> List[str]: + """ + When packaging Metaflow from within an executing Metaflow flow, we need to + exclude the files that are inserted by this content from being packaged (possibly). + + Use this function to return these files or top-level directories. + + Returns + ------- + List[str] + Files or directories to exclude + """ + return ["CONFIG", "INFO"] + + # Other non-implemented methods are OK not being implemented as they will never + # be called as they are only used when creating the package and we are starting + # with V1. + + +class MFContentV1Base(MFContent, version_id=1): + + _code_dir = ".mf_code" + _other_dir = ".mf_meta" + _info_file = "INFO" + _config_file = "CONFIG" + _dist_info_file = "DIST_INFO" + + def __init_subclass__(cls, **kwargs) -> None: + # Important to add this here to prevent the subclass of MFContentV1Base from + # also calling __init_subclass__ in MFContent (which would create a problem) + return None + + def __init__(self, code_dir: str, other_dir: str) -> None: + self._code_dir = code_dir + self._other_dir = other_dir + + @classmethod + def _get_otherfile_path( + cls, mfcontent_info: Optional[Dict[str, Any]], filename: str, in_archive: bool + ) -> str: + if in_archive: + return filename + return os.path.join(get_metaflow_root(), "..", cls._other_dir, filename) + + @classmethod + def _get_codefile_path( + cls, mfcontent_info: Optional[Dict[str, Any]], filename: str, in_archive: bool + ) -> str: + if in_archive: + return filename + return os.path.join(get_metaflow_root(), filename) + + @classmethod + def get_info_impl( + cls, mfcontent_info: Optional[Dict[str, Any]] + ) -> Optional[Dict[str, Any]]: + path_to_file = cls._get_otherfile_path( + mfcontent_info, cls._info_file, in_archive=False + ) + if os.path.isfile(path_to_file): + with open(path_to_file, "r", encoding="utf-8") as f: + return json.load(f) + return None + + @classmethod + def get_config_impl( + cls, mfcontent_info: Optional[Dict[str, Any]] + ) -> Optional[Dict[str, Any]]: + path_to_file = cls._get_otherfile_path( + mfcontent_info, cls._config_file, in_archive=False + ) + if os.path.isfile(path_to_file): + with open(path_to_file, "r", encoding="utf-8") as f: + return json.load(f) + return None + + @classmethod + def get_filename_impl( + cls, + mfcontent_info: Optional[Dict[str, Any]], + filename: str, + content_type: ContentType, + ) -> Optional[str]: + if content_type == ContentType.CODE_CONTENT: + path_to_file = cls._get_codefile_path( + mfcontent_info, filename, in_archive=False + ) + elif content_type in (ContentType.OTHER_CONTENT, ContentType.MODULE_CONTENT): + path_to_file = cls._get_otherfile_path( + mfcontent_info, filename, in_archive=False + ) + else: + raise ValueError( + f"Invalid content type {content_type} for filename {filename}" + ) + if os.path.isfile(path_to_file): + return path_to_file + return None + + @classmethod + def get_distribution_finder_impl( + cls, mfcontent_info: Optional[Dict[str, Any]] + ) -> Optional["metaflow.extension_support.metadata.DistributionFinder"]: + path_to_file = cls._get_otherfile_path( + mfcontent_info, cls._dist_info_file, in_archive=False + ) + if os.path.isfile(path_to_file): + with open(path_to_file, "r", encoding="utf-8") as f: + return PackagedDistributionFinder(json.load(f)) + return None + + @classmethod + def get_archive_info_impl( + cls, + mfcontent_info: Optional[Dict[str, Any]], + archive: Any, + packaging_backend: Type[PackagingBackend] = TarPackagingBackend, + ) -> Optional[Dict[str, Any]]: + info_file = packaging_backend.cls_get_member( + archive, + cls._get_otherfile_path(mfcontent_info, cls._info_file, in_archive=True), + ) + if info_file: + return json.loads(info_file) + return None + + @classmethod + def get_archive_config_impl( + cls, + mfcontent_info: Optional[Dict[str, Any]], + archive: Any, + packaging_backend: Type[PackagingBackend] = TarPackagingBackend, + ) -> Optional[Dict[str, Any]]: + config_file = packaging_backend.cls_get_member( + archive, + cls._get_otherfile_path(mfcontent_info, cls._config_file, in_archive=True), + ) + if config_file: + return json.loads(config_file) + return None + + @classmethod + def get_archive_filename_impl( + cls, + mfcontent_info: Optional[Dict[str, Any]], + archive: Any, + filename: str, + content_type: ContentType, + packaging_backend: Type[PackagingBackend] = TarPackagingBackend, + ) -> str: + if content_type == ContentType.CODE_CONTENT: + path_to_file = cls._get_codefile_path( + mfcontent_info, filename, in_archive=False + ) + elif content_type in (ContentType.OTHER_CONTENT, ContentType.MODULE_CONTENT): + path_to_file = cls._get_otherfile_path( + mfcontent_info, filename, in_archive=False + ) + else: + raise ValueError( + f"Invalid content type {content_type} for filename {filename}" + ) + if packaging_backend.cls_has_member(archive, path_to_file): + # The file is present in the archive + return path_to_file + return None + + @classmethod + def get_archive_content_names_impl( + cls, + mfcontent_info: Optional[Dict[str, Any]], + archive: Any, + content_types: Optional[int] = None, + packaging_backend: Type[PackagingBackend] = TarPackagingBackend, + ) -> List[str]: + to_return = [] + module_content = set(mfcontent_info.get("module_files", [])) + for filename in packaging_backend.cls_list_members(archive): + if filename.startswith(cls._other_dir) and ( + content_types & ContentType.OTHER_CONTENT.value + ): + to_return.append(filename) + elif filename.startswith(cls._code_dir): + # Special case for marker which is a other content even if in code. + if filename == f"{cls._code_dir}/{MFCONTENT_MARKER}": + if content_types & ContentType.OTHER_CONTENT.value: + to_return.append(filename) + else: + continue + # Here it is either module or code + if os.path.join(cls._code_dir, filename) in module_content: + if content_types & ContentType.MODULE_CONTENT.value: + to_return.append(filename) + elif content_types & ContentType.CODE_CONTENT.value: + to_return.append(filename) + else: + if content_types & ContentType.USER_CONTENT.value: + # Everything else is user content + to_return.append(filename) + return to_return + + @classmethod + def get_post_extract_env_vars_impl(cls, dest_dir: str) -> Dict[str, str]: + return {"PYTHONPATH": f"{dest_dir}/{cls._code_dir}"} diff --git a/metaflow/packaging_sys/backend.py b/metaflow/packaging_sys/backend.py new file mode 100644 index 00000000000..5e7b9a6b2d0 --- /dev/null +++ b/metaflow/packaging_sys/backend.py @@ -0,0 +1,114 @@ +from abc import ABC, abstractmethod +from io import BytesIO +from typing import Any, IO, List, Optional, Union + + +class PackagingBackend(ABC): + + _mappings = {} + + def __init_subclass__(cls, **kwargs): + super().__init_subclass__(**kwargs) + if cls._type in cls._mappings: + raise ValueError(f"PackagingBackend {cls._type} already exists") + cls._mappings[cls._type] = cls + + @classmethod + def get_backend(cls, name: str) -> "PackagingBackend": + if name not in cls._mappings: + raise ValueError(f"PackagingBackend {name} not found") + return cls._mappings[name] + + @classmethod + @property + def backend_type(cls) -> str: + return cls._type + + @classmethod + @abstractmethod + def get_extract_commands(cls, archive_name: str, dest_dir: str) -> List[str]: + pass + + def __init__(self): + self._archive = None + + @abstractmethod + def create(self) -> "PackagingBackend": + pass + + @abstractmethod + def add_file(self, filename: str, arcname: Optional[str] = None): + pass + + @abstractmethod + def add_data(self, data: BytesIO, arcname: str): + pass + + @abstractmethod + def close(self): + pass + + @abstractmethod + def get_blob(self) -> Optional[Union[bytes, bytearray]]: + pass + + @classmethod + @abstractmethod + def cls_open(cls, content: IO[bytes]) -> Any: + """Open the archive from the given content.""" + pass + + @classmethod + @abstractmethod + def cls_has_member(cls, archive: Any, name: str) -> bool: + pass + + @classmethod + @abstractmethod + def cls_get_member(cls, archive: Any, name: str) -> Optional[bytes]: + pass + + @classmethod + @abstractmethod + def cls_extract_members( + cls, + archive: Any, + members: Optional[List[str]] = None, + dest_dir: str = ".", + ) -> None: + pass + + @classmethod + @abstractmethod + def cls_list_members(cls, archive: Any) -> Optional[List[str]]: + pass + + def has_member(self, name: str) -> bool: + if self._archive: + return self.cls_has_member(self._archive, name) + raise ValueError("Cannot check for member in an uncreated archive") + + def get_member(self, name: str) -> Optional[bytes]: + if self._archive: + return self.cls_get_member(self._archive, name) + raise ValueError("Cannot get member from an uncreated archive") + + def extract_members( + self, members: Optional[List[str]] = None, dest_dir: str = "." + ) -> None: + if self._archive: + self.cls_extract_members(self._archive, members, dest_dir) + else: + raise ValueError("Cannot extract from an uncreated archive") + + def list_members(self) -> Optional[List[str]]: + if self._archive: + return self.cls_list_members(self._archive) + raise ValueError("Cannot list members from an uncreated archive") + + def __enter__(self): + self.create() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() diff --git a/metaflow/packaging_sys/distribution_support.py b/metaflow/packaging_sys/distribution_support.py new file mode 100644 index 00000000000..f20f7c7713f --- /dev/null +++ b/metaflow/packaging_sys/distribution_support.py @@ -0,0 +1,153 @@ +# Support saving of distribution information so we can give it back to users even +# if we do not install those distributions. This is used to package distributions in +# the MFContent package and provide an experience as if the packages were installed +# system-wide. + +import os +import re +import sys +from pathlib import Path +from types import ModuleType +from typing import ( + Callable, + Dict, + List, + Mapping, + NamedTuple, + Optional, + Set, + TYPE_CHECKING, + Union, + cast, +) + +import inspect +from collections import defaultdict + +from ..extension_support import metadata +from ..util import get_metaflow_root + +if TYPE_CHECKING: + import pathlib + +_cached_distributions = None + +packages_distributions = None # type: Optional[Callable[[], Mapping[str, List[str]]]] +name_normalizer = re.compile(r"[-_.]+") + +if sys.version_info[:2] >= (3, 10): + packages_distributions = metadata.packages_distributions +else: + # This is the code present in 3.10+ -- we replicate here for other versions + def _packages_distributions() -> Mapping[str, List[str]]: + """ + Return a mapping of top-level packages to their + distributions. + """ + pkg_to_dist = defaultdict(list) + for dist in metadata.distributions(): + for pkg in _top_level_declared(dist) or _top_level_inferred(dist): + pkg_to_dist[pkg].append(dist.metadata["Name"]) + return dict(pkg_to_dist) + + def _top_level_declared(dist: metadata.Distribution) -> List[str]: + return (dist.read_text("top_level.txt") or "").split() + + def _topmost(name: "pathlib.PurePosixPath") -> Optional[str]: + """ + Return the top-most parent as long as there is a parent. + """ + top, *rest = name.parts + return top if rest else None + + def _get_toplevel_name(name: "pathlib.PurePosixPath") -> str: + return _topmost(name) or ( + # python/typeshed#10328 + inspect.getmodulename(name) # type: ignore + or str(name) + ) + + def _top_level_inferred(dist: "metadata.Distribution"): + opt_names = set(map(_get_toplevel_name, dist.files or [])) + + def importable_name(name): + return "." not in name + + return filter(importable_name, opt_names) + + packages_distributions = _packages_distributions + + +def modules_to_distributions() -> Dict[str, List[metadata.Distribution]]: + """ + Return a mapping of top-level modules to their distributions. + + Returns + ------- + Dict[str, List[metadata.Distribution]] + A mapping of top-level modules to their distributions. + """ + global _cached_distributions + pd = cast(Callable[[], Mapping[str, List[str]]], packages_distributions) + if _cached_distributions is None: + _cached_distributions = { + k: [metadata.distribution(d) for d in v] for k, v in pd().items() + } + return _cached_distributions + + +_ModuleInfo = NamedTuple( + "_ModuleInfo", + [ + ("name", str), + ("root_paths", Set[str]), + ("module", ModuleType), + ("metaflow_module", bool), + ], +) + + +class PackagedDistribution(metadata.Distribution): + """ + A Python Package packaged within a MFContent. This allows users to use use importlib + as they would regularly and the packaged Python Package would be considered as a + distribution even if it really isn't (since it is just included in the PythonPath). + """ + + def __init__(self, root: str, content: Dict[str, str]): + self._root = Path(root) + self._content = content + + # Strongly inspired from PathDistribution in metadata.py + def read_text(self, filename: Union[str, os.PathLike]) -> Optional[str]: + if str(filename) in self._content: + return self._content[str(filename)] + return None + + read_text.__doc__ = metadata.Distribution.read_text.__doc__ + + # Returns a metadata.SimplePath but not always present in importlib.metadata libs so + # skipping return type. + def locate_file(self, path: Union[str, os.PathLike]): + return self._root / path + + +class PackagedDistributionFinder(metadata.DistributionFinder): + + def __init__(self, dist_info: Dict[str, Dict[str, str]]): + self._dist_info = dist_info + + def find_distributions(self, context=metadata.DistributionFinder.Context()): + if context.name is None: + # Yields all known distributions + for name, info in self._dist_info.items(): + yield PackagedDistribution( + os.path.join(get_metaflow_root(), name), info + ) + name = name_normalizer.sub("-", cast(str, context.name)).lower() + if name in self._dist_info: + yield PackagedDistribution( + os.path.join(get_metaflow_root(), cast(str, context.name)), + self._dist_info[name], + ) + return None diff --git a/metaflow/packaging_sys/tar_backend.py b/metaflow/packaging_sys/tar_backend.py new file mode 100644 index 00000000000..d56e8775ab6 --- /dev/null +++ b/metaflow/packaging_sys/tar_backend.py @@ -0,0 +1,87 @@ +import tarfile + +from io import BytesIO +from typing import IO, List, Optional, Union + +from .backend import PackagingBackend + + +class TarPackagingBackend(PackagingBackend): + + _type = "tgz" + + @classmethod + def get_extract_commands(cls, archive_name: str, dest_dir: str) -> List[str]: + return [ + f"TAR_OPTIONS='--warning=no-timestamp' tar -xzf {archive_name} -C {dest_dir}" + ] + + def __init__(self): + super().__init__() + self._buf = None + + def create(self): + self._buf = BytesIO() + self._archive = tarfile.open( + fileobj=self._buf, mode="w:gz", compresslevel=3, dereference=True + ) + return self + + def add_file(self, filename: str, arcname: Optional[str] = None): + info = self._archive.gettarinfo(filename, arcname) + # Setting this default to Dec 3, 2019 + info.mtime = 1575360000 + with open(filename, mode="rb") as f: + self._archive.addfile(info, f) + + def add_data(self, data: BytesIO, arcname: str): + info = tarfile.TarInfo(arcname) + data.seek(0) + info.size = len(data.getvalue()) + # Setting this default to Dec 3, 2019 + info.mtime = 1575360000 + self._archive.addfile(info, data) + + def close(self): + if self._archive: + self._archive.close() + + def get_blob(self) -> Optional[Union[bytes, bytearray]]: + if self._buf: + blob = bytearray(self._buf.getvalue()) + blob[4:8] = [0] * 4 # Reset 4 bytes from offset 4 to account for ts + return blob + return None + + @classmethod + def cls_open(cls, content: IO[bytes]) -> tarfile.TarFile: + return tarfile.open(fileobj=content, mode="r:gz") + + @classmethod + def cls_has_member(cls, archive: tarfile.TarFile, name: str) -> bool: + try: + archive.getmember(name) + return True + except KeyError: + return False + + @classmethod + def cls_get_member(cls, archive: tarfile.TarFile, name: str) -> Optional[bytes]: + try: + member = archive.getmember(name) + return archive.extractfile(member).read() + except KeyError: + return None + + @classmethod + def cls_extract_members( + cls, + archive: tarfile.TarFile, + members: Optional[List[str]] = None, + dest_dir: str = ".", + ) -> None: + archive.extractall(path=dest_dir, members=members) + + @classmethod + def cls_list_members(cls, archive: tarfile.TarFile) -> Optional[List[str]]: + return archive.getnames() or None diff --git a/metaflow/packaging_sys/utils.py b/metaflow/packaging_sys/utils.py new file mode 100644 index 00000000000..9f1bae3b627 --- /dev/null +++ b/metaflow/packaging_sys/utils.py @@ -0,0 +1,91 @@ +import os +from contextlib import contextmanager +from typing import Callable, Generator, List, Optional, Tuple + +from ..util import to_unicode + + +# this is os.walk(follow_symlinks=True) with cycle detection +def walk_without_cycles( + top_root: str, + exclude_dirs: Optional[List[str]] = None, +) -> Generator[Tuple[str, List[str]], None, None]: + seen = set() + + default_skip_dirs = ["__pycache__"] + + def _recurse(root, skip_dirs): + for parent, dirs, files in os.walk(root): + dirs[:] = [d for d in dirs if d not in skip_dirs] + for d in dirs: + path = os.path.join(parent, d) + if os.path.islink(path): + # Breaking loops: never follow the same symlink twice + # + # NOTE: this also means that links to sibling links are + # not followed. In this case: + # + # x -> y + # y -> oo + # oo/real_file + # + # real_file is only included twice, not three times + reallink = os.path.realpath(path) + if reallink not in seen: + seen.add(reallink) + for x in _recurse(path, default_skip_dirs): + yield x + yield parent, files + + skip_dirs = set(default_skip_dirs + (exclude_dirs or [])) + for x in _recurse(top_root, skip_dirs): + skip_dirs = default_skip_dirs + yield x + + +def walk( + root: str, + exclude_hidden: bool = True, + file_filter: Optional[Callable[[str], bool]] = None, + exclude_tl_dirs: Optional[List[str]] = None, +) -> Generator[Tuple[str, str], None, None]: + root = to_unicode(root) # handle files/folder with non ascii chars + prefixlen = len("%s/" % os.path.dirname(root)) + for ( + path, + files, + ) in walk_without_cycles(root, exclude_tl_dirs): + if exclude_hidden and "/." in path: + continue + # path = path[2:] # strip the ./ prefix + # if path and (path[0] == '.' or './' in path): + # continue + for fname in files: + if file_filter is None or file_filter(fname): + p = os.path.join(path, fname) + yield p, p[prefixlen:] + + +def suffix_filter(suffixes: List[str]) -> Callable[[str], bool]: + """ + Returns a filter function that checks if a file ends with any of the given suffixes. + """ + suffixes = [s.lower() for s in suffixes] + + def _filter(fname: str) -> bool: + fname = fname.lower() + return ( + suffixes is None + or (fname[0] == "." and fname in suffixes) + or (fname[0] != "." and any(fname.endswith(suffix) for suffix in suffixes)) + ) + + return _filter + + +@contextmanager +def with_dir(new_dir): + current_dir = os.getcwd() + os.chdir(new_dir) + yield new_dir + os.chdir(current_dir) diff --git a/metaflow/packaging_sys/v1.py b/metaflow/packaging_sys/v1.py new file mode 100644 index 00000000000..9ec48509ed9 --- /dev/null +++ b/metaflow/packaging_sys/v1.py @@ -0,0 +1,444 @@ +import json +import os +import sys +from pathlib import Path +from types import ModuleType +from typing import Any, Callable, Dict, Generator, List, Optional, Set, Tuple, Union + +from ..debug import debug +from ..extension_support import ( + EXT_EXCLUDE_SUFFIXES, + extension_info, + package_mfext_all, + package_mfext_all_descriptions, +) +from ..exception import MetaflowException +from ..metaflow_version import get_version +from ..util import get_metaflow_root +from . import ContentType, MFCONTENT_MARKER, MFContentV1Base +from .distribution_support import _ModuleInfo, modules_to_distributions +from .utils import suffix_filter, walk + + +class MFContentV1(MFContentV1Base): + METAFLOW_SUFFIXES_LIST = [".py", ".html", ".css", ".js"] + + def __init__( + self, + code_dir: str = MFContentV1Base._code_dir, + other_dir: str = MFContentV1Base._other_dir, + criteria: Callable[[ModuleType], bool] = lambda x: True, + ): + super().__init__(code_dir, other_dir) + + self._metaflow_root = get_metaflow_root() + self._metaflow_version = get_version() + + self._criteria = criteria + + # We try to find the modules we need to package. We will first look at all modules + # and apply the criteria to them. Then we will use the most parent module that + # fits the criteria as the module to package + modules = filter(lambda x: criteria(x[1]), sys.modules.items()) + # Ensure that we see the parent modules first + modules = sorted(modules, key=lambda x: x[0]) + if modules: + last_prefix = modules[0][0] + new_modules = [modules[0]] + for name, mod in modules[1:]: + if name.startswith(last_prefix + "."): + # This is a submodule of the last module, we can skip it + continue + # Otherwise, we have a new top-level module + last_prefix = name + new_modules.append((name, mod)) + else: + new_modules = [] + + self._modules = { + name: _ModuleInfo( + name, + set( + Path(p).resolve().as_posix() + for p in getattr(mod, "__path__", [mod.__file__]) + ), + mod, + True, # This is a Metaflow module (see filter below) + ) + for (name, mod) in new_modules + } + + # Filter the modules + self._modules = { + name: info for name, info in self._modules.items() if criteria(info.module) + } + + # Contain metadata information regarding the distributions packaged. + # This allows Metaflow to "fake" distribution information when packaged + self._distmetainfo = {} # type: Dict[str, Dict[str, str]] + + # Maps an absolute path on the filesystem to the path of the file in the + # archive. + self._files = {} # type: Dict[str, str] + self._files_from_modules = {} # type: Dict[str, str] + + self._other_files = {} # type: Dict[str, str] + self._other_content = {} # type: Dict[str, bytes] + + debug.package_exec(f"Used system modules found: {str(self._modules)}") + + # Populate with files from the third party modules + for k, v in self._modules.items(): + self._files_from_modules.update(self._module_files(k, v.root_paths)) + + # Figure out the files to package for Metaflow and extensions + self._cached_metaflow_files = list(self._metaflow_distribution_files()) + self._cached_metaflow_files.extend(list(self._metaflow_extension_files())) + + def create_mfcontent_info(self) -> Dict[str, Any]: + return {"version": 1, "module_files": list(self._files_from_modules.values())} + + def get_excluded_tl_entries(self) -> List[str]: + """ + When packaging Metaflow from within an executing Metaflow flow, we need to + exclude the files that are inserted by this content from being packaged (possibly). + + Use this function to return these files or top-level directories. + + Returns + ------- + List[str] + Files or directories to exclude + """ + return [self._code_dir, self._other_dir] + + def content_names( + self, content_types: Optional[int] = None + ) -> Generator[Tuple[str, str], None, None]: + """ + Detailed list of the content of this MFContent. This will list all files + (or non files -- for the INFO or CONFIG data for example) present in the archive. + + Parameters + ---------- + content_types : Optional[int] + The type of content to get the names of. If None, all content is returned. + + Yields + ------ + Generator[Tuple[str, str], None, None] + Path on the filesystem and the name in the archive + """ + yield from self._content(content_types, generate_value=False) + + def contents( + self, content_types: Optional[int] = None + ) -> Generator[Tuple[Union[bytes, str], str], None, None]: + """ + Very similar to content_names but returns the content of the non-files + as well as bytes. For files, identical output as content_names + + Parameters + ---------- + content_types : Optional[int] + The type of content to get the content of. If None, all content is returned. + + Yields + ------ + Generator[Tuple[Union[str, bytes], str], None, None] + Content of the MF content + """ + yield from self._content(content_types, generate_value=True) + + def show(self) -> str: + """ + Returns a more human-readable string representation of the content of this + MFContent. This will not, for example, list all files but summarize what + is included at a more high level. + + Returns + ------- + str + A human-readable string representation of the content of this MFContent + """ + result = [] + if self._metaflow_version: + result.append(f"\nMetaflow version: {self._metaflow_version}") + ext_info = extension_info() + if ext_info["installed"]: + result.append("\nMetaflow extensions packaged:") + for ext_name, ext_info in ext_info["installed"].items(): + result.append( + f" - {ext_name} ({ext_info['extension_name']}) @ {ext_info['dist_version']}" + ) + + if self._modules: + mf_modules = [] + other_modules = [] + for name, info in self._modules.items(): + if info.metaflow_module: + mf_modules.append(f" - {name} @ {', '.join(info.root_paths)}") + else: + other_modules.append(f" - {name} @ {', '.join(info.root_paths)}") + if mf_modules: + result.append("\nMetaflow modules:") + result.extend(mf_modules) + if other_modules: + result.append("\nNon-Metaflow packaged modules:") + result.extend(other_modules) + + return "\n".join(result) + + def add_info(self, info: Dict[str, Any]) -> None: + """ + Add the content of the INFO file to the Metaflow content + + Parameters + ---------- + info: Dict[str, Any] + The content of the INFO file + """ + info_file_path = os.path.join(self._other_dir, self._info_file) + if info_file_path in self._other_content: + raise MetaflowException("INFO file already present in the MF environment") + self._other_content[info_file_path] = json.dumps(info).encode("utf-8") + + def add_config(self, config: Dict[str, Any]) -> None: + """ + Add the content of the CONFIG file to the Metaflow content + + Parameters + ---------- + config: Dict[str, Any] + The content of the CONFIG file + """ + config_file_path = os.path.join(self._other_dir, self._config_file) + if config_file_path in self._other_content: + raise MetaflowException("CONFIG file already present in the MF environment") + self._other_content[config_file_path] = json.dumps(config).encode("utf-8") + + def add_module(self, module: ModuleType) -> None: + """ + Add a python module to the Metaflow content + + Parameters + ---------- + module_path: ModuleType + The module to add + """ + name = module.__name__ + debug.package_exec(f"Adding module {name} to the MF content") + # If the module is a single file, we handle this here by looking at __file__ + # which will point to the single file. If it is an actual module, __path__ + # will contain the path(s) to the module + self._modules[name] = _ModuleInfo( + name, + set( + Path(p).resolve().as_posix() + for p in getattr(module, "__path__", [module.__file__]) + ), + module, + False, # This is not a Metaflow module (added by the user manually) + ) + self._files_from_modules.update( + self._module_files(name, self._modules[name].root_paths) + ) + + def add_code_file(self, file_path: str, file_name: str) -> None: + """ + Add a code file to the Metaflow content + + Parameters + ---------- + file_path: str + The path to the code file to add (on the filesystem) + file_name: str + The path in the archive to add the code file to + """ + file_path = os.path.realpath(file_path) + debug.package_exec( + f"Adding code file {file_path} as {file_name} to the MF content" + ) + + if file_path in self._files and self._files[file_path] != os.path.join( + self._code_dir, file_name.lstrip("/") + ): + raise MetaflowException( + "File '%s' is already present in the MF content with a different name: '%s'" + % (file_path, self._files[file_path]) + ) + self._files[file_path] = os.path.join(self._code_dir, file_name.lstrip("/")) + + def add_other_file(self, file_path: str, file_name: str) -> None: + """ + Add a non-python file to the Metaflow content + + Parameters + ---------- + file_path: str + The path to the file to add (on the filesystem) + file_name: str + The path in the archive to add the file to + """ + file_path = os.path.realpath(file_path) + debug.package_exec( + f"Adding other file {file_path} as {file_name} to the MF content" + ) + if file_path in self._other_files and self._other_files[ + file_path + ] != os.path.join(self._other_dir, file_name.lstrip("/")): + raise MetaflowException( + "File %s is already present in the MF content with a different name: %s" + % (file_path, self._other_files[file_path]) + ) + self._other_files[file_path] = os.path.join( + self._other_dir, file_name.lstrip("/") + ) + + def _content( + self, content_types: Optional[int] = None, generate_value: bool = False + ) -> Generator[Tuple[Union[str, bytes], str], None, None]: + from ..package import MetaflowPackage # Prevent circular dependency + + if content_types is None: + content_types = ContentType.ALL_CONTENT.value + + if content_types & ContentType.CODE_CONTENT.value: + yield from self._cached_metaflow_files + yield from self._files.items() + if content_types & ContentType.MODULE_CONTENT.value: + yield from self._files_from_modules.items() + if content_types & ContentType.OTHER_CONTENT.value: + yield from self._other_files.items() + if generate_value: + for k, v in self._other_content.items(): + yield v, k + # Include the distribution file too + yield json.dumps(self._distmetainfo).encode("utf-8"), os.path.join( + self._other_dir, self._dist_info_file + ) + yield json.dumps(self.create_mfcontent_info()).encode( + "utf-8" + ), os.path.join(self._code_dir, MFCONTENT_MARKER) + else: + for k in self._other_content.keys(): + yield "", k + yield "", os.path.join( + self._other_dir, self._dist_info_file + ) + yield "", os.path.join( + self._code_dir, MFCONTENT_MARKER + ) + + def _metaflow_distribution_files(self) -> Generator[Tuple[str, str], None, None]: + debug.package_exec("Including Metaflow from '%s'" % self._metaflow_root) + for path_tuple in walk( + os.path.join(self._metaflow_root, "metaflow"), + exclude_hidden=False, + file_filter=suffix_filter(self.METAFLOW_SUFFIXES_LIST), + ): + yield path_tuple[0], os.path.join(self._code_dir, path_tuple[1]) + + def _metaflow_extension_files(self) -> Generator[Tuple[str, str], None, None]: + # Metaflow extensions; for now, we package *all* extensions but this may change + # at a later date; it is possible to call `package_mfext_package` instead of + # `package_mfext_all` but in that case, make sure to also add a + # metaflow_extensions/__init__.py file to properly "close" the metaflow_extensions + # package and prevent other extensions from being loaded that may be + # present in the rest of the system + for path_tuple in package_mfext_all(): + yield path_tuple[0], os.path.join(self._code_dir, path_tuple[1]) + if debug.package: + ext_info = package_mfext_all_descriptions() + ext_info = { + k: {k1: v1 for k1, v1 in v.items() if k1 in ("root_paths",)} + for k, v in ext_info.items() + } + debug.package_exec(f"Metaflow extensions packaged: {ext_info}") + + def _module_files( + self, name: str, paths: Set[str] + ) -> Generator[Tuple[str, str], None, None]: + debug.package_exec( + " Looking for distributions for module %s in %s" % (name, paths) + ) + paths = set(paths) # Do not modify external paths + has_init = False + distributions = modules_to_distributions().get(name) + prefix_parts = tuple(name.split(".")) + + seen_distributions = set() + if distributions: + for dist in distributions: + dist_name = dist.metadata["Name"] # dist.name not always present + if dist_name in seen_distributions: + continue + # For some reason, sometimes the same distribution appears twice. We + # don't need to process twice. + seen_distributions.add(dist_name) + debug.package_exec( + " Including distribution '%s' for module '%s'" + % (dist_name, name) + ) + dist_root = str(dist.locate_file(name)) + if dist_root not in paths: + # This is an error because it means that this distribution is + # not contributing to the module. + raise RuntimeError( + "Distribution '%s' is not contributing to module '%s' as " + "expected (got '%s' when expected one of %s)" + % (dist.metadata["Name"], name, dist_root, paths) + ) + paths.discard(dist_root) + if dist_name not in self._distmetainfo: + # Possible that a distribution contributes to multiple modules + self._distmetainfo[dist_name] = { + # We can add more if needed but these are likely the most + # useful (captures, name, version, etc and files which can + # be used to find non-python files in the distribution). + "METADATA": dist.read_text("METADATA") or "", + "RECORD": dist.read_text("RECORD") or "", + } + for file in dist.files or []: + # Skip files that do not belong to this module (distribution may + # provide multiple modules) + if file.parts[: len(prefix_parts)] != prefix_parts: + continue + if file.parts[len(prefix_parts)] == "__init__.py": + has_init = True + yield str( + dist.locate_file(file).resolve().as_posix() + ), os.path.join(self._code_dir, *prefix_parts, *file.parts[1:]) + + # Now if there are more paths left in paths, it means there is a non-distribution + # component to this package which we also include. + debug.package_exec( + " Looking for non-distribution files for module '%s' in %s" + % (name, paths) + ) + for path in paths: + if not Path(path).is_dir(): + # Single file for the module -- this will be something like .py + yield path, os.path.join( + self._code_dir, *prefix_parts[:-1], f"{prefix_parts[-1]}.py" + ) + has_init = True + else: + for root, _, files in os.walk(path): + for file in files: + if any(file.endswith(x) for x in EXT_EXCLUDE_SUFFIXES): + continue + rel_path = os.path.relpath(os.path.join(root, file), path) + if rel_path == "__init__.py": + has_init = True + yield os.path.join(root, file), os.path.join( + self._code_dir, + name, + rel_path, + ) + # We now include an empty __init__.py file to close the module and prevent + # leaks from possible namespace packages + if not has_init: + yield os.path.join( + self._metaflow_root, "metaflow", "extension_support", "_empty_file.py" + ), os.path.join(self._code_dir, *prefix_parts, "__init__.py") diff --git a/metaflow/plugins/airflow/airflow.py b/metaflow/plugins/airflow/airflow.py index 304fa9f3bd9..1c5ce7c7dbc 100644 --- a/metaflow/plugins/airflow/airflow.py +++ b/metaflow/plugins/airflow/airflow.py @@ -66,6 +66,7 @@ def __init__( name, graph, flow, + code_package_metadata, code_package_sha, code_package_url, metadata, @@ -87,6 +88,7 @@ def __init__( self.name = name self.graph = graph self.flow = flow + self.code_package_metadata = code_package_metadata self.code_package_sha = code_package_sha self.code_package_url = code_package_url self.metadata = metadata @@ -372,6 +374,7 @@ def _to_job(self, node): # Technically the "user" is the stakeholder but should these labels be present. } additional_mf_variables = { + "METAFLOW_CODE_METADATA": self.code_package_metadata, "METAFLOW_CODE_SHA": self.code_package_sha, "METAFLOW_CODE_URL": self.code_package_url, "METAFLOW_CODE_DS": self.flow_datastore.TYPE, @@ -476,6 +479,7 @@ def _to_job(self, node): node.name, AIRFLOW_MACROS.create_task_id(self.contains_foreach), AIRFLOW_MACROS.ATTEMPT, + code_package_metadata=self.code_package_metadata, code_package_url=self.code_package_url, step_cmds=self._step_cli( node, input_paths, self.code_package_url, user_code_retries diff --git a/metaflow/plugins/airflow/airflow_cli.py b/metaflow/plugins/airflow/airflow_cli.py index 63b96b46a24..896061342b7 100644 --- a/metaflow/plugins/airflow/airflow_cli.py +++ b/metaflow/plugins/airflow/airflow_cli.py @@ -292,16 +292,21 @@ def make_flow( # Save the code package in the flow datastore so that both user code and # metaflow package can be retrieved during workflow execution. obj.package = MetaflowPackage( - obj.flow, obj.environment, obj.echo, obj.package_suffixes + obj.flow, + obj.environment, + obj.echo, + suffixes=obj.package_suffixes, + flow_datastore=obj.flow_datastore, ) - package_url, package_sha = obj.flow_datastore.save_data( - [obj.package.blob], len_hint=1 - )[0] + # This blocks until the package is created + package_url = obj.package.package_url() + package_sha = obj.package.package_sha() return Airflow( dag_name, obj.graph, obj.flow, + obj.package.package_metadata, package_sha, package_url, obj.metadata, diff --git a/metaflow/plugins/argo/argo_workflows.py b/metaflow/plugins/argo/argo_workflows.py index 98b62a9523c..84d01d58afd 100644 --- a/metaflow/plugins/argo/argo_workflows.py +++ b/metaflow/plugins/argo/argo_workflows.py @@ -90,6 +90,7 @@ def __init__( name, graph: FlowGraph, flow, + code_package_metadata, code_package_sha, code_package_url, production_token, @@ -142,6 +143,7 @@ def __init__( self.name = name self.graph = graph self.flow = flow + self.code_package_metadata = code_package_metadata self.code_package_sha = code_package_sha self.code_package_url = code_package_url self.production_token = production_token @@ -550,7 +552,7 @@ def _process_parameters(self): type=param_type, description=param.kwargs.get("help"), is_required=is_required, - **extra_attrs + **extra_attrs, ) return parameters @@ -1570,7 +1572,9 @@ def _container_templates(self): mflog_expr, ] + self.environment.get_package_commands( - self.code_package_url, self.flow_datastore.TYPE + self.code_package_metadata, + self.code_package_url, + self.flow_datastore.TYPE, ) ) step_cmds = self.environment.bootstrap_commands( @@ -1748,6 +1752,7 @@ def _container_templates(self): **{ # These values are needed by Metaflow to set it's internal # state appropriately. + "METAFLOW_CODE_METADATA": self.code_package_metadata, "METAFLOW_CODE_URL": self.code_package_url, "METAFLOW_CODE_SHA": self.code_package_sha, "METAFLOW_CODE_DS": self.flow_datastore.TYPE, @@ -2410,7 +2415,9 @@ def _error_msg_capture_hook_templates(self): mflog_expr, ] + self.environment.get_package_commands( - self.code_package_url, self.flow_datastore.TYPE + self.code_package_metadata, + self.code_package_url, + self.flow_datastore.TYPE, )[:-1] # Replace the line 'Task in starting' # FIXME: this can be brittle. @@ -2430,6 +2437,7 @@ def _error_msg_capture_hook_templates(self): env = { # These values are needed by Metaflow to set it's internal # state appropriately. + "METAFLOW_CODE_METADATA": self.code_package_metadata, "METAFLOW_CODE_URL": self.code_package_url, "METAFLOW_CODE_SHA": self.code_package_sha, "METAFLOW_CODE_DS": self.flow_datastore.TYPE, @@ -2874,7 +2882,9 @@ def _heartbeat_daemon_template(self): mflog_expr, ] + self.environment.get_package_commands( - self.code_package_url, self.flow_datastore.TYPE + self.code_package_metadata, + self.code_package_url, + self.flow_datastore.TYPE, )[:-1] # Replace the line 'Task in starting' # FIXME: this can be brittle. @@ -2889,6 +2899,7 @@ def _heartbeat_daemon_template(self): env = { # These values are needed by Metaflow to set it's internal # state appropriately. + "METAFLOW_CODE_METADATA": self.code_package_metadata, "METAFLOW_CODE_URL": self.code_package_url, "METAFLOW_CODE_SHA": self.code_package_sha, "METAFLOW_CODE_DS": self.flow_datastore.TYPE, diff --git a/metaflow/plugins/argo/argo_workflows_cli.py b/metaflow/plugins/argo/argo_workflows_cli.py index d6b02397a97..7ccd86a26bf 100644 --- a/metaflow/plugins/argo/argo_workflows_cli.py +++ b/metaflow/plugins/argo/argo_workflows_cli.py @@ -518,16 +518,21 @@ def make_flow( # Save the code package in the flow datastore so that both user code and # metaflow package can be retrieved during workflow execution. obj.package = MetaflowPackage( - obj.flow, obj.environment, obj.echo, obj.package_suffixes + obj.flow, + obj.environment, + obj.echo, + suffixes=obj.package_suffixes, + flow_datastore=obj.flow_datastore, ) - package_url, package_sha = obj.flow_datastore.save_data( - [obj.package.blob], len_hint=1 - )[0] + # This blocks until the package is created + package_url = obj.package.package_url() + package_sha = obj.package.package_sha() return ArgoWorkflows( name, obj.graph, obj.flow, + obj.package.package_metadata, package_sha, package_url, token, diff --git a/metaflow/plugins/aws/batch/batch.py b/metaflow/plugins/aws/batch/batch.py index 16ce9a06cef..297858898c7 100644 --- a/metaflow/plugins/aws/batch/batch.py +++ b/metaflow/plugins/aws/batch/batch.py @@ -59,14 +59,24 @@ def __init__(self, metadata, environment): self._client = BatchClient() atexit.register(lambda: self.job.kill() if hasattr(self, "job") else None) - def _command(self, environment, code_package_url, step_name, step_cmds, task_spec): + def _command( + self, + environment, + code_package_metadata, + code_package_url, + step_name, + step_cmds, + task_spec, + ): mflog_expr = export_mflog_env_vars( datastore_type="s3", stdout_path=STDOUT_PATH, stderr_path=STDERR_PATH, **task_spec ) - init_cmds = environment.get_package_commands(code_package_url, "s3") + init_cmds = environment.get_package_commands( + code_package_metadata, code_package_url, "s3" + ) init_expr = " && ".join(init_cmds) step_expr = bash_capture_logs( " && ".join(environment.bootstrap_commands(step_name, "s3") + step_cmds) @@ -167,6 +177,7 @@ def create_job( step_name, step_cli, task_spec, + code_package_metadata, code_package_sha, code_package_url, code_package_ds, @@ -210,7 +221,12 @@ def create_job( .job_queue(queue) .command( self._command( - self.environment, code_package_url, step_name, [step_cli], task_spec + self.environment, + code_package_metadata, + code_package_url, + step_name, + [step_cli], + task_spec, ) ) .image(image) @@ -249,6 +265,7 @@ def create_job( ) .task_id(attrs.get("metaflow.task_id")) .environment_variable("AWS_DEFAULT_REGION", self._client.region()) + .environment_variable("METAFLOW_CODE_METADATA", code_package_metadata) .environment_variable("METAFLOW_CODE_SHA", code_package_sha) .environment_variable("METAFLOW_CODE_URL", code_package_url) .environment_variable("METAFLOW_CODE_DS", code_package_ds) @@ -334,6 +351,7 @@ def launch_job( step_name, step_cli, task_spec, + code_package_metadata, code_package_sha, code_package_url, code_package_ds, @@ -374,6 +392,7 @@ def launch_job( step_name, step_cli, task_spec, + code_package_metadata, code_package_sha, code_package_url, code_package_ds, diff --git a/metaflow/plugins/aws/batch/batch_cli.py b/metaflow/plugins/aws/batch/batch_cli.py index a2d2199e2e6..9d3eb4cbaa0 100644 --- a/metaflow/plugins/aws/batch/batch_cli.py +++ b/metaflow/plugins/aws/batch/batch_cli.py @@ -100,6 +100,7 @@ def kill(ctx, run_id, user, my_runs): "Metaflow." ) @click.argument("step-name") +@click.argument("code-package-metadata") @click.argument("code-package-sha") @click.argument("code-package-url") @click.option("--executable", help="Executable requirement for AWS Batch.") @@ -185,6 +186,7 @@ def kill(ctx, run_id, user, my_runs): def step( ctx, step_name, + code_package_metadata, code_package_sha, code_package_url, executable=None, @@ -317,6 +319,7 @@ def _sync_metadata(): step_name, step_cli, task_spec, + code_package_metadata, code_package_sha, code_package_url, ctx.obj.flow_datastore.TYPE, diff --git a/metaflow/plugins/aws/batch/batch_decorator.py b/metaflow/plugins/aws/batch/batch_decorator.py index 5c61d3f8f03..e40bf693108 100644 --- a/metaflow/plugins/aws/batch/batch_decorator.py +++ b/metaflow/plugins/aws/batch/batch_decorator.py @@ -126,6 +126,7 @@ class BatchDecorator(StepDecorator): "gpu": "0", "memory": "4096", } + package_metadata = None package_url = None package_sha = None run_time_limit = None @@ -189,7 +190,6 @@ def step_init(self, flow, graph, step, decos, environment, flow_datastore, logge self.logger = logger self.environment = environment self.step = step - self.flow_datastore = flow_datastore self.attributes.update( compute_resource_attributes(decos, self, self.resource_defaults) @@ -218,7 +218,7 @@ def runtime_task_created( self, task_datastore, task_id, split_index, input_paths, is_cloned, ubf_context ): if not is_cloned: - self._save_package_once(self.flow_datastore, self.package) + self._save_package_once(self.package) def runtime_step_cli( self, cli_args, retry_count, max_user_code_retries, ubf_context @@ -228,6 +228,7 @@ def runtime_step_cli( # to execute on AWS Batch anymore. We can execute possible fallback # code locally. cli_args.commands = ["batch", "step"] + cli_args.command_args.append(self.package_metadata) cli_args.command_args.append(self.package_sha) cli_args.command_args.append(self.package_url) cli_args.command_options.update(self.attributes) @@ -401,11 +402,12 @@ def _wait_for_mapper_tasks(self, flow, step_name): ) @classmethod - def _save_package_once(cls, flow_datastore, package): + def _save_package_once(cls, package): if cls.package_url is None: - cls.package_url, cls.package_sha = flow_datastore.save_data( - [package.blob], len_hint=1 - )[0] + # Blocks until the package is uploaded + cls.package_url = package.package_url() + cls.package_sha = package.package_sha() + cls.package_metadata = package.package_metadata def _setup_multinode_environment(): diff --git a/metaflow/plugins/aws/step_functions/step_functions.py b/metaflow/plugins/aws/step_functions/step_functions.py index ccf22b4fd35..213eaa9a91a 100644 --- a/metaflow/plugins/aws/step_functions/step_functions.py +++ b/metaflow/plugins/aws/step_functions/step_functions.py @@ -40,6 +40,7 @@ def __init__( name, graph, flow, + code_package_metadata, code_package_sha, code_package_url, production_token, @@ -59,6 +60,7 @@ def __init__( self.name = name self.graph = graph self.flow = flow + self.code_package_metadata = code_package_metadata self.code_package_sha = code_package_sha self.code_package_url = code_package_url self.production_token = production_token @@ -847,6 +849,7 @@ def _batch(self, node): node, input_paths, self.code_package_url, user_code_retries ), task_spec=task_spec, + code_package_metadata=self.code_package_metadata, code_package_sha=self.code_package_sha, code_package_url=self.code_package_url, code_package_ds=self.flow_datastore.TYPE, diff --git a/metaflow/plugins/aws/step_functions/step_functions_cli.py b/metaflow/plugins/aws/step_functions/step_functions_cli.py index 6965d15f9fc..8fdb768db4b 100644 --- a/metaflow/plugins/aws/step_functions/step_functions_cli.py +++ b/metaflow/plugins/aws/step_functions/step_functions_cli.py @@ -331,16 +331,21 @@ def make_flow( ) obj.package = MetaflowPackage( - obj.flow, obj.environment, obj.echo, obj.package_suffixes + obj.flow, + obj.environment, + obj.echo, + suffixes=obj.package_suffixes, + flow_datastore=obj.flow_datastore, ) - package_url, package_sha = obj.flow_datastore.save_data( - [obj.package.blob], len_hint=1 - )[0] + # This blocks until the package is created + package_url = obj.package.package_url() + package_sha = obj.package.package_sha() return StepFunctions( name, obj.graph, obj.flow, + obj.package.package_metadata, package_sha, package_url, token, diff --git a/metaflow/plugins/kubernetes/kubernetes.py b/metaflow/plugins/kubernetes/kubernetes.py index d8217808d2b..8d66673b4f8 100644 --- a/metaflow/plugins/kubernetes/kubernetes.py +++ b/metaflow/plugins/kubernetes/kubernetes.py @@ -90,6 +90,7 @@ def _command( step_name, task_id, attempt, + code_package_metadata, code_package_url, step_cmds, ): @@ -104,7 +105,7 @@ def _command( stderr_path=STDERR_PATH, ) init_cmds = self._environment.get_package_commands( - code_package_url, self._datastore.TYPE + code_package_metadata, code_package_url, self._datastore.TYPE ) init_expr = " && ".join(init_cmds) step_expr = bash_capture_logs( @@ -165,6 +166,7 @@ def create_jobset( task_id, attempt, user, + code_package_metadata, code_package_sha, code_package_url, code_package_ds, @@ -232,6 +234,7 @@ def create_jobset( qos=qos, security_context=security_context, ) + .environment_variable("METAFLOW_CODE_METADATA", code_package_metadata) .environment_variable("METAFLOW_CODE_SHA", code_package_sha) .environment_variable("METAFLOW_CODE_URL", code_package_url) .environment_variable("METAFLOW_CODE_DS", code_package_ds) @@ -415,6 +418,7 @@ def create_jobset( step_name=step_name, task_id=_tskid, attempt=attempt, + code_package_metadata=code_package_metadata, code_package_url=code_package_url, step_cmds=[ step_cli.replace( @@ -463,6 +467,7 @@ def create_job_object( task_id, attempt, user, + code_package_metadata, code_package_sha, code_package_url, code_package_ds, @@ -511,6 +516,7 @@ def create_job_object( step_name=step_name, task_id=task_id, attempt=attempt, + code_package_metadata=code_package_metadata, code_package_url=code_package_url, step_cmds=[step_cli], ), @@ -539,6 +545,7 @@ def create_job_object( qos=qos, security_context=security_context, ) + .environment_variable("METAFLOW_CODE_METADATA", code_package_metadata) .environment_variable("METAFLOW_CODE_SHA", code_package_sha) .environment_variable("METAFLOW_CODE_URL", code_package_url) .environment_variable("METAFLOW_CODE_DS", code_package_ds) diff --git a/metaflow/plugins/kubernetes/kubernetes_cli.py b/metaflow/plugins/kubernetes/kubernetes_cli.py index 38d43af75dd..e15f7b06cb9 100644 --- a/metaflow/plugins/kubernetes/kubernetes_cli.py +++ b/metaflow/plugins/kubernetes/kubernetes_cli.py @@ -41,6 +41,7 @@ def kubernetes(): ) @tracing.cli("kubernetes/step") @click.argument("step-name") +@click.argument("code-package-metadata") @click.argument("code-package-sha") @click.argument("code-package-url") @click.option( @@ -161,6 +162,7 @@ def kubernetes(): def step( ctx, step_name, + code_package_metadata, code_package_sha, code_package_url, executable=None, @@ -304,6 +306,7 @@ def _sync_metadata(): task_id=task_id, attempt=str(retry_count), user=util.get_username(), + code_package_metadata=code_package_metadata, code_package_sha=code_package_sha, code_package_url=code_package_url, code_package_ds=ctx.obj.flow_datastore.TYPE, diff --git a/metaflow/plugins/kubernetes/kubernetes_decorator.py b/metaflow/plugins/kubernetes/kubernetes_decorator.py index 9ffd9fd4955..80188a935e0 100644 --- a/metaflow/plugins/kubernetes/kubernetes_decorator.py +++ b/metaflow/plugins/kubernetes/kubernetes_decorator.py @@ -168,6 +168,7 @@ class KubernetesDecorator(StepDecorator): "qos": KUBERNETES_QOS, "security_context": None, } + package_metadata = None package_url = None package_sha = None run_time_limit = None @@ -322,7 +323,6 @@ def step_init(self, flow, graph, step, decos, environment, flow_datastore, logge self.logger = logger self.environment = environment self.step = step - self.flow_datastore = flow_datastore if ( self.attributes["qos"] is not None @@ -461,7 +461,7 @@ def runtime_task_created( # access to the code package. We store the package in the datastore # which the pod is able to download as part of it's entrypoint. if not is_cloned: - self._save_package_once(self.flow_datastore, self.package) + self._save_package_once(self.package) def runtime_step_cli( self, cli_args, retry_count, max_user_code_retries, ubf_context @@ -471,6 +471,7 @@ def runtime_step_cli( # to execute on Kubernetes anymore. We can execute possible fallback # code locally. cli_args.commands = ["kubernetes", "step"] + cli_args.command_args.append(self.package_metadata) cli_args.command_args.append(self.package_sha) cli_args.command_args.append(self.package_url) @@ -640,11 +641,12 @@ def task_finished( pass @classmethod - def _save_package_once(cls, flow_datastore, package): + def _save_package_once(cls, package): if cls.package_url is None: - cls.package_url, cls.package_sha = flow_datastore.save_data( - [package.blob], len_hint=1 - )[0] + # Blocks until the package is uploaded + cls.package_url = package.package_url() + cls.package_sha = package.package_sha() + cls.package_metadata = package.package_metadata # TODO: Unify this method with the multi-node setup in @batch diff --git a/metaflow/plugins/package_cli.py b/metaflow/plugins/package_cli.py index 505a3fdcb93..96c63270bb7 100644 --- a/metaflow/plugins/package_cli.py +++ b/metaflow/plugins/package_cli.py @@ -9,35 +9,30 @@ def cli(): @cli.group(help="Commands related to code packages.") +@click.option( + "--timeout", default=60, help="Timeout for package operations in seconds." +) @click.pass_obj -def package(obj): +def package(obj, timeout): # Prepare the package before any of the sub-commands are invoked. + # We explicitly will *not* upload it to the datastore. obj.package = MetaflowPackage( - obj.flow, obj.environment, obj.echo, obj.package_suffixes + obj.flow, + obj.environment, + obj.echo, + suffixes=obj.package_suffixes, + flow_datastore=None, ) + obj.package_op_timeout = timeout -@package.command(help="Output information about the current code package.") +@package.command(help="Output information about the code package.") @click.pass_obj def info(obj): - obj.echo("Status of the current working directory:", fg="magenta", bold=False) - obj.echo_always( - "Hash: *%s*" % sha1(obj.package.blob).hexdigest(), - highlight="green", - highlight_bold=False, - ) - obj.echo_always( - "Package size: *%d* KB" % (len(obj.package.blob) / 1024), - highlight="green", - highlight_bold=False, - ) - num = sum(1 for _ in obj.package.path_tuples()) - obj.echo_always( - "Number of files: *%d*" % num, highlight="green", highlight_bold=False - ) + obj.echo_always(obj.package.show()) -@package.command(help="List files included in the code package.") +@package.command(help="List all files included in the code package.") @click.option( "--archive/--no-archive", default=False, @@ -47,8 +42,10 @@ def info(obj): ) @click.pass_obj def list(obj, archive=False): + _ = obj.package.blob(timeout=obj.package_op_timeout) + # We now have all the information about the blob obj.echo( - "Files included in the code package " "(change with --package-suffixes):", + "Files included in the code package (change with --package-suffixes):", fg="magenta", bold=False, ) @@ -58,10 +55,15 @@ def list(obj, archive=False): obj.echo_always("\n".join(path for path, _ in obj.package.path_tuples())) -@package.command(help="Save the current code package in a tar file") +@package.command(help="Save the current code package to a file.") @click.argument("path") @click.pass_obj def save(obj, path): with open(path, "wb") as f: - f.write(obj.package.blob) - obj.echo("Code package saved in *%s*." % path, fg="magenta", bold=False) + f.write(obj.package.blob()) + obj.echo( + "Code package saved in *%s* with metadata: %s" + % (path, obj.package.package_metadata), + fg="magenta", + bold=False, + ) diff --git a/metaflow/plugins/pypi/bootstrap.py b/metaflow/plugins/pypi/bootstrap.py index f244693f65e..ffa2f6b59f1 100644 --- a/metaflow/plugins/pypi/bootstrap.py +++ b/metaflow/plugins/pypi/bootstrap.py @@ -12,9 +12,10 @@ from urllib.error import URLError from urllib.request import urlopen from metaflow.metaflow_config import DATASTORE_LOCAL_DIR, CONDA_USE_FAST_INIT +from metaflow.packaging_sys import MFContent, ContentType from metaflow.plugins import DATASTORES from metaflow.plugins.pypi.utils import MICROMAMBA_MIRROR_URL, MICROMAMBA_URL -from metaflow.util import which, get_metaflow_root +from metaflow.util import which from urllib.request import Request import warnings @@ -365,8 +366,11 @@ def fast_setup_environment(architecture, storage, env, prefix, pkgs_dir): # Move MAGIC_FILE inside local datastore. os.makedirs(manifest_dir, exist_ok=True) + path_to_manifest = MFContent.get_filename(MAGIC_FILE, ContentType.OTHER_CONTENT) + if path_to_manifest is None: + raise RuntimeError(f"Cannot find {MAGIC_FILE} in the package") shutil.move( - os.path.join(get_metaflow_root(), MAGIC_FILE), + path_to_manifest, os.path.join(manifest_dir, MAGIC_FILE), ) with open(os.path.join(manifest_dir, MAGIC_FILE)) as f: diff --git a/metaflow/plugins/pypi/conda_decorator.py b/metaflow/plugins/pypi/conda_decorator.py index c4d5bd5fb3a..e493ec5fec5 100644 --- a/metaflow/plugins/pypi/conda_decorator.py +++ b/metaflow/plugins/pypi/conda_decorator.py @@ -1,5 +1,3 @@ -import importlib -import json import os import platform import re @@ -7,12 +5,9 @@ import tempfile from metaflow.decorators import FlowDecorator, StepDecorator -from metaflow.extension_support import EXT_PKG from metaflow.metadata_provider import MetaDatum from metaflow.metaflow_environment import InvalidEnvironmentException -from metaflow.util import get_metaflow_root - -from ...info_file import INFO_FILE +from metaflow.packaging_sys import ContentType class CondaStepDecorator(StepDecorator): @@ -45,6 +40,10 @@ class CondaStepDecorator(StepDecorator): "python": None, "disabled": None, } + + _metaflow_home = None + _addl_env_vars = None + # To define conda channels for the whole solve, users can specify # CONDA_CHANNELS in their environment. For pinning specific packages to specific # conda channels, users can specify channel::package as the package name. @@ -152,67 +151,17 @@ def step_init(self, flow, graph, step, decos, environment, flow_datastore, logge def runtime_init(self, flow, graph, package, run_id): if self.disabled: return - # Create a symlink to metaflow installed outside the virtual environment. - self.metaflow_dir = tempfile.TemporaryDirectory(dir="/tmp") - os.symlink( - os.path.join(get_metaflow_root(), "metaflow"), - os.path.join(self.metaflow_dir.name, "metaflow"), - ) - - info = os.path.join(get_metaflow_root(), os.path.basename(INFO_FILE)) - # Symlink the INFO file as well to properly propagate down the Metaflow version - if os.path.isfile(info): - os.symlink( - info, os.path.join(self.metaflow_dir.name, os.path.basename(INFO_FILE)) + # We need to make all the code package available to the user code in + # a temporary directory which will be added to the PYTHONPATH. + if self.__class__._metaflow_home is None: + # Do this ONCE per flow + self.__class__._metaflow_home = tempfile.TemporaryDirectory(dir="/tmp") + package.extract_into( + self.__class__._metaflow_home.name, ContentType.ALL_CONTENT + ) + self.__class__._addl_env_vars = package.get_post_extract_env_vars( + package.package_metadata, self.__class__._metaflow_home.name ) - else: - # If there is no info file, we will actually create one in this new - # place because we won't be able to properly resolve the EXT_PKG extensions - # the same way as outside conda (looking at distributions, etc.). In a - # Conda environment, as shown below (where we set self.addl_paths), all - # EXT_PKG extensions are PYTHONPATH extensions. Instead of re-resolving, - # we use the resolved information that is written out to the INFO file. - with open( - os.path.join(self.metaflow_dir.name, os.path.basename(INFO_FILE)), - mode="wt", - encoding="utf-8", - ) as f: - f.write( - json.dumps( - self.environment.get_environment_info(include_ext_info=True) - ) - ) - - # Support metaflow extensions. - self.addl_paths = None - try: - m = importlib.import_module(EXT_PKG) - except ImportError: - # No additional check needed because if we are here, we already checked - # for other issues when loading at the toplevel. - pass - else: - custom_paths = list(set(m.__path__)) - # For some reason, at times, unique paths appear multiple times. We - # simplify to avoid un-necessary links. - - if len(custom_paths) == 1: - # Regular package; we take a quick shortcut here. - os.symlink( - custom_paths[0], - os.path.join(self.metaflow_dir.name, EXT_PKG), - ) - else: - # This is a namespace package, we therefore create a bunch of - # directories so that we can symlink in those separately, and we will - # add those paths to the PYTHONPATH for the interpreter. Note that we - # don't symlink to the parent of the package because that could end up - # including more stuff we don't want - self.addl_paths = [] - for p in custom_paths: - temp_dir = tempfile.mkdtemp(dir=self.metaflow_dir.name) - os.symlink(p, os.path.join(temp_dir, EXT_PKG)) - self.addl_paths.append(temp_dir) # # Also install any environment escape overrides directly here to enable # # the escape to work even in non metaflow-created subprocesses @@ -291,11 +240,15 @@ def runtime_step_cli( if self.disabled: return # Ensure local installation of Metaflow is visible to user code - python_path = self.metaflow_dir.name - if self.addl_paths is not None: - addl_paths = os.pathsep.join(self.addl_paths) - python_path = os.pathsep.join([addl_paths, python_path]) - cli_args.env["PYTHONPATH"] = python_path + python_path = self.__class__._metaflow_home.name + addl_env_vars = {} + if self.__class__._addl_env_vars is not None: + for key, value in self.__class__._addl_env_vars.items(): + if key == "PYTHONPATH": + addl_env_vars[key] = os.pathsep.join([value, python_path]) + else: + addl_env_vars[key] = value + cli_args.env.update(addl_env_vars) if self.interpreter: # https://github.com/conda/conda/issues/7707 # Also ref - https://github.com/Netflix/metaflow/pull/178 @@ -306,7 +259,9 @@ def runtime_step_cli( def runtime_finished(self, exception): if self.disabled: return - self.metaflow_dir.cleanup() + if self.__class__._metaflow_home is not None: + self.__class__._metaflow_home.cleanup() + self.__class__._metaflow_home = None class CondaFlowDecorator(FlowDecorator): diff --git a/metaflow/plugins/pypi/conda_environment.py b/metaflow/plugins/pypi/conda_environment.py index 75a954a3023..7473ea3301d 100644 --- a/metaflow/plugins/pypi/conda_environment.py +++ b/metaflow/plugins/pypi/conda_environment.py @@ -17,6 +17,7 @@ from metaflow.exception import MetaflowException from metaflow.metaflow_config import get_pinned_conda_libs from metaflow.metaflow_environment import MetaflowEnvironment +from metaflow.packaging_sys import ContentType from . import MAGIC_FILE, _datastore_packageroot from .utils import conda_platform @@ -470,7 +471,9 @@ def add_to_package(self): files = [] manifest = self.get_environment_manifest_path() if os.path.exists(manifest): - files.append((manifest, os.path.basename(manifest))) + files.append( + (manifest, os.path.basename(manifest), ContentType.OTHER_CONTENT) + ) return files def bootstrap_commands(self, step_name, datastore_type): diff --git a/metaflow/plugins/uv/bootstrap.py b/metaflow/plugins/uv/bootstrap.py index 619b85496c1..e91c6cb395b 100644 --- a/metaflow/plugins/uv/bootstrap.py +++ b/metaflow/plugins/uv/bootstrap.py @@ -1,4 +1,5 @@ import os +import shutil import subprocess import sys import time @@ -6,6 +7,7 @@ from metaflow.util import which from metaflow.info_file import read_info_file from metaflow.metaflow_config import get_pinned_conda_libs +from metaflow.packaging_sys import MFContent, ContentType from urllib.request import Request, urlopen from urllib.error import URLError @@ -93,6 +95,13 @@ def skip_metaflow_dependencies(): return skip_pkgs def sync_uv_project(datastore_type): + # Move the files to the current directory so uv can find them. + for filename in ["uv.lock", "pyproject.toml"]: + path_to_file = MFContent.get_filename(filename, ContentType.OTHER_CONTENT) + if path_to_file is None: + raise RuntimeError(f"Could not find {filename} in the package.") + shutil.move(path_to_file, os.path.join(os.getcwd(), filename)) + print("Syncing uv project...") dependencies = " ".join(get_dependencies(datastore_type)) skip_pkgs = " ".join( diff --git a/metaflow/plugins/uv/uv_environment.py b/metaflow/plugins/uv/uv_environment.py index cc361650802..3c4beed8bb5 100644 --- a/metaflow/plugins/uv/uv_environment.py +++ b/metaflow/plugins/uv/uv_environment.py @@ -2,6 +2,7 @@ from metaflow.exception import MetaflowException from metaflow.metaflow_environment import MetaflowEnvironment +from metaflow.packaging_sys import ContentType class UVException(MetaflowException): @@ -43,8 +44,8 @@ def _find(filename): pyproject_path = _find("pyproject.toml") uv_lock_path = _find("uv.lock") files = [ - (uv_lock_path, "uv.lock"), - (pyproject_path, "pyproject.toml"), + (uv_lock_path, "uv.lock", ContentType.OTHER_CONTENT), + (pyproject_path, "pyproject.toml", ContentType.OTHER_CONTENT), ] return files diff --git a/metaflow/runner/click_api.py b/metaflow/runner/click_api.py index cafeb7bab77..75193beb44c 100644 --- a/metaflow/runner/click_api.py +++ b/metaflow/runner/click_api.py @@ -658,6 +658,7 @@ def _method(_self, *args, **kwargs): .kubernetes() .step( step_name="process", + code_package_metadata="some_version", code_package_sha="some_sha", code_package_url="some_url", ) diff --git a/metaflow/runtime.py b/metaflow/runtime.py index 7e9269841fb..d807f2c558c 100644 --- a/metaflow/runtime.py +++ b/metaflow/runtime.py @@ -95,6 +95,7 @@ def __init__( max_num_splits=MAX_NUM_SPLITS, max_log_size=MAX_LOG_SIZE, resume_identifier=None, + skip_decorator_hooks=False, ): if run_id is None: self._run_id = metadata.new_run_id() @@ -128,6 +129,7 @@ def __init__( self._ran_or_scheduled_task_index = set() self._reentrant = reentrant self._run_url = None + self._skip_decorator_hooks = skip_decorator_hooks # If steps_to_rerun is specified, we will not clone them in resume mode. self._steps_to_rerun = steps_to_rerun or {} @@ -179,9 +181,10 @@ def __init__( # finished. self._control_num_splits = {} # control_task -> num_splits mapping - for step in flow: - for deco in step.decorators: - deco.runtime_init(flow, graph, package, self._run_id) + if not self._skip_decorator_hooks: + for step in flow: + for deco in step.decorators: + deco.runtime_init(flow, graph, package, self._run_id) def _new_task(self, step, input_paths=None, **kwargs): if input_paths is None: @@ -192,7 +195,7 @@ def _new_task(self, step, input_paths=None, **kwargs): if step in self._steps_to_rerun: may_clone = False - if step == "_parameters": + if step == "_parameters" or self._skip_decorator_hooks: decos = [] else: decos = getattr(self._flow, step).decorators @@ -566,9 +569,10 @@ def execute(self): raise finally: # on finish clean tasks - for step in self._flow: - for deco in step.decorators: - deco.runtime_finished(exception) + if not self._skip_decorator_hooks: + for step in self._flow: + for deco in step.decorators: + deco.runtime_finished(exception) # assert that end was executed and it was successful if ("end", ()) in self._finished: diff --git a/metaflow/task.py b/metaflow/task.py index 414b7e54710..3b0fab7aec9 100644 --- a/metaflow/task.py +++ b/metaflow/task.py @@ -538,6 +538,7 @@ def run_step( output.save_metadata( { "task_begin": { + "code_package_metadata": os.environ.get("METAFLOW_CODE_METADATA"), "code_package_sha": os.environ.get("METAFLOW_CODE_SHA"), "code_package_ds": os.environ.get("METAFLOW_CODE_DS"), "code_package_url": os.environ.get("METAFLOW_CODE_URL"), diff --git a/metaflow/user_configs/config_options.py b/metaflow/user_configs/config_options.py index 5883a7fe76e..18b08355fca 100644 --- a/metaflow/user_configs/config_options.py +++ b/metaflow/user_configs/config_options.py @@ -7,8 +7,9 @@ from metaflow._vendor import click from metaflow.debug import debug -from .config_parameters import CONFIG_FILE, ConfigValue +from .config_parameters import ConfigValue from ..exception import MetaflowException, MetaflowInternalError +from ..packaging_sys import MFContent from ..parameters import DeployTimeField, ParameterContext, current_flow from ..util import get_username @@ -24,12 +25,16 @@ def _load_config_values(info_file: Optional[str] = None) -> Optional[Dict[Any, Any]]: if info_file is None: - info_file = os.path.basename(CONFIG_FILE) - try: - with open(info_file, encoding="utf-8") as contents: - return json.load(contents).get("user_configs", {}) - except IOError: - return None + config_content = MFContent.get_config() + else: + try: + with open(info_file, encoding="utf-8") as f: + config_content = json.load(f) + except IOError: + return None + if config_content: + return config_content.get("user_configs", {}) + return None class ConvertPath(click.Path): @@ -437,7 +442,7 @@ class LocalFileInput(click.Path): # Small wrapper around click.Path to set the value from which to read configuration # values. This is set immediately upon processing the --local-config-file # option and will therefore then be available when processing any of the other - # --config options (which will call ConfigInput.process_configs + # --config options (which will call ConfigInput.process_configs) name = "LocalFileInput" def convert(self, value, param, ctx): diff --git a/metaflow/util.py b/metaflow/util.py index 8636b8253bc..c0383766b5d 100644 --- a/metaflow/util.py +++ b/metaflow/util.py @@ -9,7 +9,6 @@ from itertools import takewhile import re -from metaflow.exception import MetaflowUnknownUser, MetaflowInternalError try: # python2 @@ -162,6 +161,8 @@ def get_username(): def resolve_identity_as_tuple(): + from metaflow.exception import MetaflowUnknownUser + prod_token = os.environ.get("METAFLOW_PRODUCTION_TOKEN") if prod_token: return "production", prod_token @@ -236,6 +237,8 @@ class of the given object. def compress_list(lst, separator=",", rangedelim=":", zlibmarker="!", zlibmin=500): + from metaflow.exception import MetaflowInternalError + bad_items = [x for x in lst if separator in x or rangedelim in x or zlibmarker in x] if bad_items: raise MetaflowInternalError(