Skip to content

[RfR] New packaging support #2461

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions metaflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from .metaflow_current import current
from metaflow.system import _system_monitor, _system_logger
from .metaflow_environment import MetaflowEnvironment
from .packaging_sys import MFContent
from .plugins import (
DATASTORES,
ENVIRONMENTS,
Expand Down Expand Up @@ -326,6 +327,11 @@ def start(
echo(" executing *%s*" % ctx.obj.flow.name, fg="magenta", nl=False)
echo(" for *%s*" % resolve_identity(), fg="magenta")

# Check if we need to setup the distribution finder (if running )
dist_info = MFContent.get_distribution_finder()
if dist_info:
sys.meta_path.append(dist_info)

# Setup the context
cli_args._set_top_kwargs(ctx.params)
ctx.obj.echo = echo
Expand Down
1 change: 1 addition & 0 deletions metaflow/cli_components/init_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def init(obj, run_id=None, task_id=None, tags=None, **kwargs):
obj.event_logger,
obj.monitor,
run_id=run_id,
skip_decorator_hooks=True,
)
obj.flow._set_constants(obj.graph, kwargs, obj.config_options)
runtime.persist_constants(task_id=task_id)
6 changes: 5 additions & 1 deletion metaflow/cli_components/run_cmds.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ def before_run(obj, tags, decospecs):
# We explicitly avoid doing this in `start` since it is invoked for every
# step in the run.
obj.package = MetaflowPackage(
obj.flow, obj.environment, obj.echo, obj.package_suffixes
obj.flow,
obj.environment,
obj.echo,
suffixes=obj.package_suffixes,
flow_datastore=obj.flow_datastore,
)


Expand Down
54 changes: 24 additions & 30 deletions metaflow/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@
from metaflow.includefile import IncludedFile
from metaflow.metaflow_config import DEFAULT_METADATA, MAX_ATTEMPTS
from metaflow.metaflow_environment import MetaflowEnvironment
from metaflow.package import MetaflowPackage
from metaflow.packaging_sys import ContentType
from metaflow.packaging_sys.tar_backend import TarPackagingBackend
from metaflow.plugins import ENVIRONMENTS, METADATA_PROVIDERS
from metaflow.unbounded_foreach import CONTROL_TASK_TAG
from metaflow.util import cached_property, is_stringish, resolve_identity, to_unicode

from ..info_file import INFO_FILE
from .filecache import FileCache

if TYPE_CHECKING:
Expand Down Expand Up @@ -816,20 +818,27 @@ def __init__(self, flow_name: str, code_package: str):
self._path = info["location"]
self._ds_type = info["ds_type"]
self._sha = info["sha"]
self._code_metadata = info.get("metadata")
if self._code_metadata is None:
# Default string
self._code_metadata = (
'{"version": 0, "backend": "tgz", "mfcontent_version": 0}'
)
self._backend = MetaflowPackage.get_backend(self._code_metadata)

if filecache is None:
filecache = FileCache()
_, blobdata = filecache.get_data(
self._ds_type, self._flow_name, self._path, self._sha
)
code_obj = BytesIO(blobdata)
self._tar = tarfile.open(fileobj=code_obj, mode="r:gz")
# The JSON module in Python3 deals with Unicode. Tar gives bytes.
info_str = (
self._tar.extractfile(os.path.basename(INFO_FILE)).read().decode("utf-8")
)
self._info = json.loads(info_str)
self._flowspec = self._tar.extractfile(self._info["script"]).read()
self._code_obj = BytesIO(blobdata)
self._info = MetaflowPackage.cls_get_info(self._code_metadata, self._code_obj)
if self._info:
self._flowspec = MetaflowPackage.cls_get_content(
self._code_metadata, self._code_obj, self._info["script"]
)
else:
raise MetaflowInternalError("Code package metadata is invalid.")

@property
def path(self) -> str:
Expand Down Expand Up @@ -877,7 +886,9 @@ def tarball(self) -> tarfile.TarFile:
TarFile
TarFile for everything in this code package
"""
return self._tar
if self._backend == TarPackagingBackend:
return self._backend.cls_open(self._code_obj)
raise RuntimeError("Archive is not a tarball")

def extract(self) -> TemporaryDirectory:
"""
Expand Down Expand Up @@ -908,27 +919,10 @@ def extract(self) -> TemporaryDirectory:
The directory and its contents are automatically deleted when
this object is garbage collected.
"""
exclusions = [
"metaflow/",
"metaflow_extensions/",
"INFO",
"CONFIG_PARAMETERS",
"conda.manifest",
# This file is created when using the conda/pypi features available in
# nflx-metaflow-extensions: https://github.com/Netflix/metaflow-nflx-extensions
"condav2-1.cnd",
]
members = [
m
for m in self.tarball.getmembers()
if not any(
(x.endswith("/") and m.name.startswith(x)) or (m.name == x)
for x in exclusions
)
]

tmp = TemporaryDirectory()
self.tarball.extractall(tmp.name, members)
MetaflowPackage.cls_extract_into(
self._code_metadata, self._code_obj, tmp.name, ContentType.USER_CONTENT
)
return tmp

@property
Expand Down
5 changes: 5 additions & 0 deletions metaflow/debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ def log(self, typ, args):
filename = inspect.stack()[1][1]
print("debug[%s %s:%s]: %s" % (typ, filename, lineno, s), file=sys.stderr)

def __getattr__(self, name):
# Small piece of code to get pyright and other linters to recognize that there
# are dynamic attributes.
return getattr(self, name)

def noop(self, args):
pass

Expand Down
52 changes: 32 additions & 20 deletions metaflow/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,6 @@

from metaflow._vendor import click

try:
unicode
except NameError:
unicode = str
basestring = str

# Contains the decorators on which _init was called. We want to ensure it is called
# only once on each decorator and, as the _init() function below can be called in
Expand Down Expand Up @@ -189,7 +184,7 @@ def make_decorator_spec(self):
# escaping but for more complex types (typically dictionaries or lists),
# we dump using JSON.
for k, v in attrs.items():
if isinstance(v, (int, float, unicode, basestring)):
if isinstance(v, (int, float, str)):
attr_list.append("%s=%s" % (k, str(v)))
else:
attr_list.append("%s=%s" % (k, json.dumps(v).replace('"', '\\"')))
Expand Down Expand Up @@ -315,15 +310,36 @@ def package_init(self, flow, step_name, environment):

def add_to_package(self):
"""
Called to add custom packages needed for a decorator. This hook will be
Called to add custom files needed for this environment. This hook will be
called in the `MetaflowPackage` class where metaflow compiles the code package
tarball. This hook is invoked in the `MetaflowPackage`'s `path_tuples`
function. The `path_tuples` function is a generator that yields a tuple of
`(file_path, arcname)`.`file_path` is the path of the file in the local file system;
the `arcname` is the path of the file in the constructed tarball or the path of the file
after decompressing the tarball.

Returns a list of tuples where each tuple represents (file_path, arcname)
tarball. This hook can return one of two things (the first is for backwards
compatibility -- move to the second):
- a generator yielding a tuple of `(file_path, arcname)` to add files to
the code package. `file_path` is the path to the file on the local filesystem
and `arcname` is the path relative to the packaged code.
- a generator yielding a tuple of `(content, arcname, type)` where:
- type is one of
ContentType.{USER_CONTENT, CODE_CONTENT, MODULE_CONTENT, OTHER_CONTENT}
- for USER_CONTENT:
- the file will be included relative to the directory containing the
user's flow file.
- content: path to the file to include
- arcname: path relative to the directory containing the user's flow file
- for CODE_CONTENT:
- the file will be included relative to the code directory in the package.
This will be the directory containing `metaflow`.
- content: path to the file to include
- arcname: path relative to the code directory in the package
- for MODULE_CONTENT:
- the module will be added to the code package as a python module. It will
be accessible as usual (import <module_name>)
- content: name of the module
- arcname: None (ignored)
- for OTHER_CONTENT:
- the file will be included relative to any other configuration/metadata
files for the flow
- content: path to the file to include
- arcname: path relative to the config directory in the package
"""
return []

Expand Down Expand Up @@ -685,12 +701,8 @@ def foo(self):
f.is_step = True
f.decorators = []
f.config_decorators = []
try:
# python 3
f.name = f.__name__
except:
# python 2
f.name = f.__func__.func_name
f.wrappers = []
f.name = f.__name__
return f


Expand Down
23 changes: 15 additions & 8 deletions metaflow/extension_support/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from itertools import chain
from pathlib import Path

from metaflow.info_file import read_info_file
from metaflow.meta_files import read_info_file


#
Expand Down Expand Up @@ -214,6 +214,10 @@ def package_mfext_all():
yield path_tuple


def package_mfext_all_descriptions():
return _all_packages


def load_globals(module, dst_globals, extra_indent=False):
if extra_indent:
extra_indent = " "
Expand Down Expand Up @@ -808,13 +812,16 @@ def _get_extension_packages(ignore_info_file=False, restrict_to_directories=None
" Extends '%s' with config '%s'"
% (_extension_points[idx], config_module)
)
mf_pkg_list.append(package_name)
mf_ext_packages[package_name] = {
"root_paths": [package_path],
"meta_module": meta_module,
"files": files_to_include,
"version": "_local_",
}
if files_to_include:
mf_pkg_list.append(package_name)
mf_ext_packages[package_name] = {
"root_paths": [package_path],
"meta_module": meta_module,
"files": files_to_include,
"version": "_local_",
}
else:
_ext_debug("Skipping package as no files found (empty dir?)")

# Sanity check that we only have one package per configuration file.
# This prevents multiple packages from providing the same named configuration
Expand Down
4 changes: 2 additions & 2 deletions metaflow/extension_support/_empty_file.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# This file serves as a __init__.py for metaflow_extensions when it is packaged
# and needs to remain empty.
# This file serves as a __init__.py for metaflow_extensions or metaflow
# packages when they are packaged and needs to remain empty.
25 changes: 0 additions & 25 deletions metaflow/info_file.py

This file was deleted.

13 changes: 13 additions & 0 deletions metaflow/meta_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
_UNINITIALIZED = object()
_info_file_content = _UNINITIALIZED


def read_info_file():
# Prevent circular import
from .packaging_sys import MFContent

global _info_file_content

if id(_info_file_content) == id(_UNINITIALIZED):
_info_file_content = MFContent.get_info()
return _info_file_content
8 changes: 7 additions & 1 deletion metaflow/metadata_provider/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,11 +674,17 @@ def _register_system_metadata(self, run_id, step_name, task_id, attempt):
if code_sha:
code_url = os.environ.get("METAFLOW_CODE_URL")
code_ds = os.environ.get("METAFLOW_CODE_DS")
code_metadata = os.environ.get("METAFLOW_CODE_METADATA")
metadata.append(
MetaDatum(
field="code-package",
value=json.dumps(
{"ds_type": code_ds, "sha": code_sha, "location": code_url}
{
"ds_type": code_ds,
"sha": code_sha,
"location": code_url,
"metadata": code_metadata,
}
),
type="code-package",
tags=["attempt_id:{0}".format(attempt)],
Expand Down
1 change: 1 addition & 0 deletions metaflow/metaflow_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@
"stubgen",
"userconf",
"conda",
"package",
]

for typ in DEBUG_OPTIONS:
Expand Down
Loading
Loading