-
Notifications
You must be signed in to change notification settings - Fork 787
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Better logs for @conda/@pypi #2080
base: master
Are you sure you want to change the base?
Changes from all commits
40f4b0f
ef81307
b870280
3aedfb7
6d11fff
8c90b87
148f567
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,13 +1,18 @@ | ||
import inspect | ||
import json | ||
import shutil | ||
import sys | ||
import threading | ||
import time | ||
import traceback | ||
from datetime import datetime | ||
from functools import wraps | ||
from itertools import cycle | ||
|
||
import metaflow.tracing as tracing | ||
from metaflow._vendor import click | ||
from metaflow.client.core import get_metadata | ||
from metaflow.system import _system_logger, _system_monitor | ||
|
||
from . import decorators, lint, metaflow_version, namespace, parameters, plugins | ||
from .cli_args import cli_args | ||
|
@@ -24,7 +29,6 @@ | |
DEFAULT_PACKAGE_SUFFIXES, | ||
) | ||
from .metaflow_current import current | ||
from metaflow.system import _system_monitor, _system_logger | ||
from .metaflow_environment import MetaflowEnvironment | ||
from .mflog import LOG_SOURCES, mflog | ||
from .package import MetaflowPackage | ||
|
@@ -68,20 +72,81 @@ def echo_dev_null(*args, **kwargs): | |
pass | ||
|
||
|
||
_animation_thread = None | ||
_animation_stop = threading.Event() | ||
_default_spinner = cycle(["-", "\\", "|", "/"]) | ||
|
||
|
||
def _animate(get_frame, line, err, kwargs, indent): | ||
while not _animation_stop.is_set(): | ||
frame = get_frame(line) | ||
if indent: | ||
frame = INDENT + frame | ||
terminal_width, _ = shutil.get_terminal_size() | ||
frame = frame.ljust(terminal_width)[:terminal_width] | ||
click.secho(f"\r{frame}", nl=False, err=err, **kwargs) | ||
click.get_text_stream("stderr" if err else "stdout").flush() | ||
time.sleep(0.1) | ||
|
||
|
||
def echo_always(line, **kwargs): | ||
global _animation_thread, _animation_stop | ||
|
||
kwargs["err"] = kwargs.get("err", True) | ||
if kwargs.pop("indent", None): | ||
line = "\n".join(INDENT + x for x in line.splitlines()) | ||
if "nl" not in kwargs or kwargs["nl"]: | ||
overwrite = kwargs.pop("overwrite", False) | ||
animate = kwargs.pop("animate", None) | ||
indent = kwargs.pop("indent", False) | ||
|
||
if _animation_thread is not None: | ||
_animation_stop.set() | ||
_animation_thread.join() | ||
_animation_thread = None | ||
_animation_stop.clear() | ||
click.echo("\r", nl=False, err=kwargs["err"]) | ||
|
||
if indent: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. simplify: |
||
if animate: | ||
# For animated output, we prepend INDENT in the animation function | ||
pass | ||
else: | ||
line = INDENT + line | ||
|
||
animation_kwargs = { | ||
"fg": kwargs.get("fg"), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fallbacks needed to be safe. |
||
"bg": kwargs.get("bg"), | ||
"bold": kwargs.get("bold", False), | ||
"underline": kwargs.get("underline", False), | ||
} | ||
|
||
if animate: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. check how animate and overwrite shows up in runtime logs and notebooks There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes please! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. notebooks seem broken still, animate produces new lines for each frame, even though the |
||
if animate is True: | ||
get_frame = lambda line: f"{next(_default_spinner)} {line}" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. f-string not supported in 3.5 |
||
else: | ||
get_frame = animate | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. confused here -- what is animate? It doesn't seem to be a simple bool here so a bit confused. |
||
_animation_stop.clear() | ||
_animation_thread = threading.Thread( | ||
target=_animate, | ||
args=(get_frame, line, kwargs["err"], animation_kwargs, indent), | ||
) | ||
_animation_thread.start() | ||
return | ||
|
||
if overwrite: | ||
terminal_width, _ = shutil.get_terminal_size() | ||
line = line.ljust(terminal_width)[:terminal_width] | ||
click.echo("\r", nl=False, err=kwargs["err"]) | ||
elif "nl" not in kwargs or kwargs["nl"]: | ||
line += ERASE_TO_EOL | ||
|
||
top = kwargs.pop("padding_top", None) | ||
bottom = kwargs.pop("padding_bottom", None) | ||
highlight = kwargs.pop("highlight", HIGHLIGHT) | ||
if top: | ||
|
||
if top and not overwrite: | ||
click.secho(ERASE_TO_EOL, **kwargs) | ||
|
||
hl_bold = kwargs.pop("highlight_bold", True) | ||
nl = kwargs.pop("nl", True) | ||
nl = kwargs.pop("nl", True) and not overwrite | ||
fg = kwargs.pop("fg", None) | ||
bold = kwargs.pop("bold", False) | ||
kwargs["nl"] = False | ||
|
@@ -104,8 +169,10 @@ def echo_always(line, **kwargs): | |
if nl: | ||
kwargs["nl"] = True | ||
click.secho("", **kwargs) | ||
if bottom: | ||
if bottom and not overwrite: | ||
click.secho(ERASE_TO_EOL, **kwargs) | ||
if overwrite: | ||
click.get_text_stream("stderr" if kwargs["err"] else "stdout").flush() | ||
|
||
|
||
def logger(body="", system_msg=False, head="", bad=False, timestamp=True, nl=True): | ||
|
@@ -960,7 +1027,7 @@ def start( | |
ctx.obj.environment = [ | ||
e for e in ENVIRONMENTS + [MetaflowEnvironment] if e.TYPE == environment | ||
][0](ctx.obj.flow) | ||
ctx.obj.environment.validate_environment(ctx.obj.logger, datastore) | ||
ctx.obj.environment.validate_environment(echo, datastore) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. take care of symmetric changes for fast bakery |
||
|
||
ctx.obj.event_logger = LOGGING_SIDECARS[event_logger]( | ||
flow=ctx.obj.flow, env=ctx.obj.environment | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,11 +1,15 @@ | ||
import bz2 | ||
import concurrent.futures | ||
import io | ||
import json | ||
import os | ||
import shutil | ||
import subprocess | ||
import sys | ||
import tarfile | ||
import time | ||
|
||
import requests | ||
|
||
from metaflow.metaflow_config import DATASTORE_LOCAL_DIR | ||
from metaflow.plugins import DATASTORES | ||
|
@@ -15,6 +19,18 @@ | |
|
||
# Bootstraps a valid conda virtual environment composed of conda and pypi packages | ||
|
||
|
||
def timer(func): | ||
def wrapper(*args, **kwargs): | ||
start_time = time.time() | ||
result = func(*args, **kwargs) | ||
duration = time.time() - start_time | ||
# print(f"Time taken for {func.__name__}: {duration:.2f} seconds") | ||
return result | ||
|
||
return wrapper | ||
|
||
|
||
if __name__ == "__main__": | ||
if len(sys.argv) != 5: | ||
print("Usage: bootstrap.py <flow_name> <id> <datastore_type> <architecture>") | ||
|
@@ -47,6 +63,8 @@ | |
|
||
prefix = os.path.join(os.getcwd(), architecture, id_) | ||
pkgs_dir = os.path.join(os.getcwd(), ".pkgs") | ||
conda_pkgs_dir = os.path.join(pkgs_dir, "conda") | ||
pypi_pkgs_dir = os.path.join(pkgs_dir, "pypi") | ||
manifest_dir = os.path.join(os.getcwd(), DATASTORE_LOCAL_DIR, flow_name) | ||
|
||
datastores = [d for d in DATASTORES if d.TYPE == datastore_type] | ||
|
@@ -64,77 +82,145 @@ | |
os.path.join(os.getcwd(), MAGIC_FILE), | ||
os.path.join(manifest_dir, MAGIC_FILE), | ||
) | ||
|
||
with open(os.path.join(manifest_dir, MAGIC_FILE)) as f: | ||
env = json.load(f)[id_][architecture] | ||
|
||
# Download Conda packages. | ||
conda_pkgs_dir = os.path.join(pkgs_dir, "conda") | ||
with storage.load_bytes([package["path"] for package in env["conda"]]) as results: | ||
for key, tmpfile, _ in results: | ||
def run_cmd(cmd): | ||
result = subprocess.run( | ||
cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True | ||
) | ||
if result.returncode != 0: | ||
print(f"Bootstrap failed while executing: {cmd}") | ||
print("Stdout:", result.stdout) | ||
print("Stderr:", result.stderr) | ||
sys.exit(1) | ||
|
||
@timer | ||
def install_micromamba(architecture): | ||
# TODO: check if mamba or conda are already available on the image | ||
micromamba_dir = os.path.join(os.getcwd(), "micromamba") | ||
micromamba_path = os.path.join(micromamba_dir, "bin", "micromamba") | ||
|
||
if which("micromamba") or os.path.exists(micromamba_path): | ||
return micromamba_path | ||
|
||
os.makedirs(micromamba_dir, exist_ok=True) | ||
# TODO: download micromamba from datastore | ||
url = f"https://micro.mamba.pm/api/micromamba/{architecture}/1.5.7" | ||
response = requests.get(url, stream=True) | ||
if response.status_code != 200: | ||
raise Exception( | ||
f"Failed to download micromamba: HTTP {response.status_code}" | ||
) | ||
tar_content = bz2.BZ2Decompressor().decompress(response.raw.read()) | ||
with tarfile.open(fileobj=io.BytesIO(tar_content), mode="r:") as tar: | ||
tar.extract("bin/micromamba", path=micromamba_dir, set_attrs=False) | ||
|
||
os.chmod(micromamba_path, 0o755) | ||
if not os.path.exists(micromamba_path): | ||
raise Exception("Failed to install Micromamba!") | ||
|
||
os.environ["PATH"] += os.pathsep + os.path.dirname(micromamba_path) | ||
return micromamba_path | ||
|
||
@timer | ||
def download_conda_packages(storage, packages, dest_dir): | ||
|
||
def process_conda_package(args): | ||
# Ensure that conda packages go into architecture specific folders. | ||
# The path looks like REPO/CHANNEL/CONDA_SUBDIR/PACKAGE. We trick | ||
# Micromamba into believing that all packages are coming from a local | ||
# channel - the only hurdle is ensuring that packages are organised | ||
# properly. | ||
|
||
# TODO: consider RAM disk | ||
dest = os.path.join(conda_pkgs_dir, "/".join(key.split("/")[-2:])) | ||
key, tmpfile, dest_dir = args | ||
dest = os.path.join(dest_dir, "/".join(key.split("/")[-2:])) | ||
os.makedirs(os.path.dirname(dest), exist_ok=True) | ||
shutil.move(tmpfile, dest) | ||
|
||
# Create Conda environment. | ||
cmds = [ | ||
# TODO: check if mamba or conda are already available on the image | ||
# TODO: micromamba installation can be pawned off to micromamba.py | ||
f"""set -e; | ||
if ! command -v micromamba >/dev/null 2>&1; then | ||
mkdir -p micromamba; | ||
python -c "import requests, bz2, sys; data = requests.get('https://micro.mamba.pm/api/micromamba/{architecture}/1.5.7').content; sys.stdout.buffer.write(bz2.decompress(data))" | tar -xv -C $(pwd)/micromamba bin/micromamba --strip-components 1; | ||
os.makedirs(dest_dir, exist_ok=True) | ||
with storage.load_bytes([package["path"] for package in packages]) as results: | ||
with concurrent.futures.ThreadPoolExecutor() as executor: | ||
executor.map( | ||
process_conda_package, | ||
[(key, tmpfile, dest_dir) for key, tmpfile, _ in results], | ||
) | ||
# for key, tmpfile, _ in results: | ||
|
||
# # TODO: consider RAM disk | ||
# dest = os.path.join(dest_dir, "/".join(key.split("/")[-2:])) | ||
# os.makedirs(os.path.dirname(dest), exist_ok=True) | ||
# shutil.move(tmpfile, dest) | ||
return dest_dir | ||
|
||
@timer | ||
def download_pypi_packages(storage, packages, dest_dir): | ||
|
||
def process_pypi_package(args): | ||
key, tmpfile, dest_dir = args | ||
dest = os.path.join(dest_dir, os.path.basename(key)) | ||
shutil.move(tmpfile, dest) | ||
|
||
os.makedirs(dest_dir, exist_ok=True) | ||
with storage.load_bytes([package["path"] for package in packages]) as results: | ||
with concurrent.futures.ThreadPoolExecutor() as executor: | ||
executor.map( | ||
process_pypi_package, | ||
[(key, tmpfile, dest_dir) for key, tmpfile, _ in results], | ||
) | ||
# for key, tmpfile, _ in results: | ||
# dest = os.path.join(dest_dir, os.path.basename(key)) | ||
# shutil.move(tmpfile, dest) | ||
return dest_dir | ||
|
||
@timer | ||
def create_conda_environment(prefix, conda_pkgs_dir): | ||
cmd = f'''set -e; | ||
tmpfile=$(mktemp); | ||
echo "@EXPLICIT" > "$tmpfile"; | ||
ls -d {conda_pkgs_dir}/*/* >> "$tmpfile"; | ||
export PATH=$PATH:$(pwd)/micromamba; | ||
if ! command -v micromamba >/dev/null 2>&1; then | ||
echo "Failed to install Micromamba!"; | ||
exit 1; | ||
fi; | ||
fi""", | ||
# Create a conda environment through Micromamba. | ||
f'''set -e; | ||
tmpfile=$(mktemp); | ||
echo "@EXPLICIT" > "$tmpfile"; | ||
ls -d {conda_pkgs_dir}/*/* >> "$tmpfile"; | ||
export PATH=$PATH:$(pwd)/micromamba; | ||
export CONDA_PKGS_DIRS=$(pwd)/micromamba/pkgs; | ||
micromamba create --yes --offline --no-deps --safety-checks=disabled --no-extra-safety-checks --prefix {prefix} --file "$tmpfile"; | ||
rm "$tmpfile"''', | ||
] | ||
|
||
# Download PyPI packages. | ||
if "pypi" in env: | ||
pypi_pkgs_dir = os.path.join(pkgs_dir, "pypi") | ||
with storage.load_bytes( | ||
[package["path"] for package in env["pypi"]] | ||
) as results: | ||
for key, tmpfile, _ in results: | ||
dest = os.path.join(pypi_pkgs_dir, os.path.basename(key)) | ||
os.makedirs(os.path.dirname(dest), exist_ok=True) | ||
shutil.move(tmpfile, dest) | ||
|
||
# Install PyPI packages. | ||
cmds.extend( | ||
[ | ||
f"""set -e; | ||
export PATH=$PATH:$(pwd)/micromamba; | ||
export CONDA_PKGS_DIRS=$(pwd)/micromamba/pkgs; | ||
micromamba run --prefix {prefix} python -m pip --disable-pip-version-check install --root-user-action=ignore --no-compile {pypi_pkgs_dir}/*.whl --no-user""" | ||
] | ||
) | ||
export CONDA_PKGS_DIRS=$(pwd)/micromamba/pkgs; | ||
micromamba create --yes --offline --no-deps --safety-checks=disabled --no-extra-safety-checks --prefix {prefix} --file "$tmpfile"; | ||
rm "$tmpfile"''' | ||
run_cmd(cmd) | ||
|
||
for cmd in cmds: | ||
result = subprocess.run( | ||
cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE | ||
@timer | ||
def install_pypi_packages(prefix, pypi_pkgs_dir): | ||
|
||
cmd = f"""set -e; | ||
export PATH=$PATH:$(pwd)/micromamba; | ||
export CONDA_PKGS_DIRS=$(pwd)/micromamba/pkgs; | ||
micromamba run --prefix {prefix} python -m pip --disable-pip-version-check install --root-user-action=ignore --no-compile --no-index --no-cache-dir --no-deps --prefer-binary --find-links={pypi_pkgs_dir} {pypi_pkgs_dir}/*.whl --no-user""" | ||
run_cmd(cmd) | ||
|
||
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: | ||
# install micromamba, download conda and pypi packages in parallel | ||
future_install_micromamba = executor.submit(install_micromamba, architecture) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. check if download from datastore possible |
||
future_download_conda_packages = executor.submit( | ||
download_conda_packages, storage, env["conda"], conda_pkgs_dir | ||
) | ||
if result.returncode != 0: | ||
print(f"Bootstrap failed while executing: {cmd}") | ||
print("Stdout:", result.stdout.decode()) | ||
print("Stderr:", result.stderr.decode()) | ||
sys.exit(1) | ||
future_download_pypi_packages = None | ||
if "pypi" in env: | ||
future_download_pypi_packages = executor.submit( | ||
download_pypi_packages, storage, env["pypi"], pypi_pkgs_dir | ||
) | ||
# create conda environment after micromamba is installed and conda packages are downloaded | ||
concurrent.futures.wait( | ||
[future_install_micromamba, future_download_conda_packages] | ||
) | ||
future_create_conda_environment = executor.submit( | ||
create_conda_environment, prefix, conda_pkgs_dir | ||
) | ||
if "pypi" in env: | ||
# install pypi packages after conda environment is created and pypi packages are downloaded | ||
concurrent.futures.wait( | ||
[future_create_conda_environment, future_download_pypi_packages] | ||
) | ||
future_install_pypi_packages = executor.submit( | ||
install_pypi_packages, prefix, pypi_pkgs_dir | ||
) | ||
# wait for pypi packages to be installed | ||
future_install_pypi_packages.result() | ||
else: | ||
# wait for conda environment to be created | ||
future_create_conda_environment.result() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't seem to respect the same arguments as the non animate case.