From eccd7e585f953bb0a65234dfb224d35d0e4c9c07 Mon Sep 17 00:00:00 2001 From: Oleg Avdeev Date: Fri, 10 Dec 2021 19:57:34 -0800 Subject: [PATCH 01/10] [tracing] opentelemetry support --- metaflow/cli.py | 3 + metaflow/cmd/configure_cmd.py | 1 + metaflow/cmd/main_cli.py | 2 + metaflow/metadata/heartbeat.py | 4 +- metaflow/metaflow_config.py | 2 + metaflow/mflog/save_logs.py | 3 + metaflow/multicore_utils.py | 36 ++-- metaflow/plugins/datatools/s3/s3.py | 15 +- metaflow/plugins/datatools/s3/s3op.py | 5 + metaflow/plugins/kubernetes/kubernetes.py | 2 + metaflow/plugins/kubernetes/kubernetes_cli.py | 2 + metaflow/plugins/kubernetes/kubernetes_job.py | 7 + metaflow/plugins/metadata/service.py | 22 ++- metaflow/runtime.py | 2 + metaflow/sidecar/sidecar_subprocess.py | 4 + metaflow/sidecar/sidecar_worker.py | 2 + metaflow/task.py | 67 ++++--- metaflow/tracing.py | 28 +++ metaflow/tracing_noop.py | 44 +++++ metaflow/tracing_otel.py | 170 ++++++++++++++++++ metaflow/tracing_propagator.py | 75 ++++++++ 21 files changed, 439 insertions(+), 57 deletions(-) create mode 100644 metaflow/tracing.py create mode 100644 metaflow/tracing_noop.py create mode 100644 metaflow/tracing_otel.py create mode 100644 metaflow/tracing_propagator.py diff --git a/metaflow/cli.py b/metaflow/cli.py index ad582043a66..ea223424619 100644 --- a/metaflow/cli.py +++ b/metaflow/cli.py @@ -3,6 +3,7 @@ import traceback from datetime import datetime from functools import wraps +import metaflow.tracing as tracing from metaflow._vendor import click @@ -796,6 +797,7 @@ def resume( runtime.execute() +@tracing.cli_entrypoint("cli/run") @parameters.add_custom_parameters(deploy_mode=True) @cli.command(help="Run the workflow locally.") @common_run_options @@ -894,6 +896,7 @@ def version(obj): echo_always(obj.version) +@tracing.cli_entrypoint("cli/start") @decorators.add_decorator_options @click.command( cls=click.CommandCollection, diff --git a/metaflow/cmd/configure_cmd.py b/metaflow/cmd/configure_cmd.py index d4ef1a5a541..af5bb9dc42a 100644 --- a/metaflow/cmd/configure_cmd.py +++ b/metaflow/cmd/configure_cmd.py @@ -7,6 +7,7 @@ from metaflow.util import to_unicode from metaflow._vendor import click from metaflow.util import to_unicode +import metaflow.tracing as tracing from .util import echo_always, makedirs diff --git a/metaflow/cmd/main_cli.py b/metaflow/cmd/main_cli.py index 0ba635086f0..ea6252f3ee4 100644 --- a/metaflow/cmd/main_cli.py +++ b/metaflow/cmd/main_cli.py @@ -8,9 +8,11 @@ from metaflow.metaflow_version import get_version from .util import echo_always +import metaflow.tracing as tracing @click.group() +@tracing.cli_entrypoint("cli/main") def main(): pass diff --git a/metaflow/metadata/heartbeat.py b/metaflow/metadata/heartbeat.py index 7a73c0cf90d..383417df338 100644 --- a/metaflow/metadata/heartbeat.py +++ b/metaflow/metadata/heartbeat.py @@ -56,7 +56,9 @@ def _ping(self): def _heartbeat(self): if self.hb_url is not None: - response = requests.post(url=self.hb_url, data="{}", headers=self.headers) + response = requests.post( + url=self.hb_url, data="{}", headers=self.headers.copy() + ) # Unfortunately, response.json() returns a string that we need # to cast to json; however when the request encounters an error # the return type is a json blob :/ diff --git a/metaflow/metaflow_config.py b/metaflow/metaflow_config.py index 71d9beecb6a..d8c623346c9 100644 --- a/metaflow/metaflow_config.py +++ b/metaflow/metaflow_config.py @@ -399,6 +399,8 @@ KUBERNETES_SANDBOX_INIT_SCRIPT = from_conf("KUBERNETES_SANDBOX_INIT_SCRIPT") +OTEL_ENDPOINT = from_conf("OTEL_ENDPOINT") + # MAX_ATTEMPTS is the maximum number of attempts, including the first # task, retries, and the final fallback task and its retries. # diff --git a/metaflow/mflog/save_logs.py b/metaflow/mflog/save_logs.py index ea7ca673288..0ff6706e6f9 100644 --- a/metaflow/mflog/save_logs.py +++ b/metaflow/mflog/save_logs.py @@ -8,9 +8,12 @@ from metaflow.util import Path from . import TASK_LOG_SOURCE +from metaflow.tracing import cli_entrypoint + SMALL_FILE_LIMIT = 1024 * 1024 +@cli_entrypoint("save_logs") def save_logs(): def _read_file(path): with open(path, "rb") as f: diff --git a/metaflow/multicore_utils.py b/metaflow/multicore_utils.py index 73ad909b3a4..5b9828af18b 100644 --- a/metaflow/multicore_utils.py +++ b/metaflow/multicore_utils.py @@ -5,6 +5,7 @@ from multiprocessing import cpu_count from tempfile import NamedTemporaryFile import time +import metaflow.tracing as tracing try: # Python 2 @@ -40,23 +41,24 @@ def _spawn(func, arg, dir): if pid: return pid, output_file else: - try: - exit_code = 1 - ret = func(arg) - with open(output_file, "wb") as f: - pickle.dump(ret, f, protocol=pickle.HIGHEST_PROTOCOL) - exit_code = 0 - except: - # we must not let any exceptions escape this function - # which might trigger unintended side effects - traceback.print_exc() - finally: - sys.stderr.flush() - sys.stdout.flush() - # we can't use sys.exit(0) here since it raises SystemExit - # that may have unintended side effects (e.g. triggering - # finally blocks). - os._exit(exit_code) + with tracing.post_fork(): + try: + exit_code = 1 + ret = func(arg) + with open(output_file, "wb") as f: + pickle.dump(ret, f, protocol=pickle.HIGHEST_PROTOCOL) + exit_code = 0 + except: + # we must not let any exceptions escape this function + # which might trigger unintended side-effects + traceback.print_exc() + finally: + sys.stderr.flush() + sys.stdout.flush() + # we can't use sys.exit(0) here since it raises SystemExit + # that may have unintended side-effects (e.g. triggering + # finally blocks). + os._exit(exit_code) def parallel_imap_unordered(func, iterable, max_parallel=None, dir=None): diff --git a/metaflow/plugins/datatools/s3/s3.py b/metaflow/plugins/datatools/s3/s3.py index c5d7ddcc12f..103b86ba837 100644 --- a/metaflow/plugins/datatools/s3/s3.py +++ b/metaflow/plugins/datatools/s3/s3.py @@ -31,6 +31,7 @@ ) from metaflow.exception import MetaflowException from metaflow.debug import debug +import metaflow.tracing as tracing try: # python2 @@ -1173,9 +1174,14 @@ def put( def _upload(s3, _): # We make sure we are at the beginning in case we are retrying blob.seek(0) - s3.upload_fileobj( - blob, src.netloc, src.path.lstrip("/"), ExtraArgs=extra_args - ) + + # We use manual tracing here because boto3 instrumentation + # has an issue with upload_fileobj losing track of tracing context + # https://github.com/open-telemetry/opentelemetry-python-contrib/issues/298 + with tracing.traced("s3.upload_fileobj", {"path": src.path}): + s3.upload_fileobj( + blob, src.netloc, src.path.lstrip("/"), ExtraArgs=extra_args + ) if overwrite: self._one_boto_op(_upload, url, create_tmp_file=False) @@ -1605,10 +1611,13 @@ def try_s3_op(last_ok_count, pending_retries, out_lines, inject_failures): try: debug.s3client_exec(cmdline + addl_cmdline) # Run the operation. + env = os.environ.copy() + tracing.inject_tracing_vars(env) stdout = subprocess.check_output( cmdline + addl_cmdline, cwd=self._tmpdir, stderr=stderr.file, + env=env, ) # Here we did not have any error -- transient or otherwise. ok_lines = stdout.splitlines() diff --git a/metaflow/plugins/datatools/s3/s3op.py b/metaflow/plugins/datatools/s3/s3op.py index 8d337b593f9..81ac22304b5 100644 --- a/metaflow/plugins/datatools/s3/s3op.py +++ b/metaflow/plugins/datatools/s3/s3op.py @@ -43,6 +43,7 @@ TRANSIENT_RETRY_LINE_CONTENT, TRANSIENT_RETRY_START_LINE, ) +import metaflow.tracing as tracing NUM_WORKERS_DEFAULT = 64 @@ -153,6 +154,7 @@ def normalize_client_error(err): # S3 worker pool +@tracing.cli_entrypoint("s3op/worker") def worker(result_file_name, queue, mode, s3config): # Interpret mode, it can either be a single op or something like # info_download or info_upload which implies: @@ -719,6 +721,7 @@ def cli(): pass +@tracing.cli_entrypoint("s3op/list") @cli.command("list", help="List S3 objects") @click.option( "--recursive/--no-recursive", @@ -778,6 +781,7 @@ def lst( print(format_result_line(idx, url.prefix, url.url, str(size))) +@tracing.cli_entrypoint("s3op/put") @cli.command(help="Upload files to S3") @click.option( "--file", @@ -972,6 +976,7 @@ def _populate_prefixes(prefixes, inputs): return prefixes, is_transient_retry +@tracing.cli_entrypoint("s3op/get") @cli.command(help="Download files from S3") @click.option( "--recursive/--no-recursive", diff --git a/metaflow/plugins/kubernetes/kubernetes.py b/metaflow/plugins/kubernetes/kubernetes.py index 410651f1e29..932960b41fa 100644 --- a/metaflow/plugins/kubernetes/kubernetes.py +++ b/metaflow/plugins/kubernetes/kubernetes.py @@ -34,6 +34,7 @@ SERVICE_HEADERS, SERVICE_INTERNAL_URL, S3_SERVER_SIDE_ENCRYPTION, + OTEL_ENDPOINT, ) from metaflow.metaflow_config_funcs import config_values @@ -255,6 +256,7 @@ def create_job( .environment_variable( "METAFLOW_INIT_SCRIPT", KUBERNETES_SANDBOX_INIT_SCRIPT ) + .environment_variable("METAFLOW_OTEL_ENDPOINT", OTEL_ENDPOINT) # Skip setting METAFLOW_DATASTORE_SYSROOT_LOCAL because metadata sync # between the local user instance and the remote Kubernetes pod # assumes metadata is stored in DATASTORE_LOCAL_DIR on the Kubernetes diff --git a/metaflow/plugins/kubernetes/kubernetes_cli.py b/metaflow/plugins/kubernetes/kubernetes_cli.py index e759973f3b7..559da6622ff 100644 --- a/metaflow/plugins/kubernetes/kubernetes_cli.py +++ b/metaflow/plugins/kubernetes/kubernetes_cli.py @@ -9,6 +9,7 @@ from metaflow.metadata.util import sync_local_metadata_from_datastore from metaflow.metaflow_config import DATASTORE_LOCAL_DIR, KUBERNETES_LABELS from metaflow.mflog import TASK_LOG_SOURCE +import metaflow.tracing as tracing from .kubernetes import Kubernetes, KubernetesKilledException, parse_kube_keyvalue_list from .kubernetes_decorator import KubernetesDecorator @@ -24,6 +25,7 @@ def kubernetes(): pass +@tracing.cli_entrypoint("kubernetes/step") @kubernetes.command( help="Execute a single task on Kubernetes. This command calls the top-level step " "command inside a Kubernetes pod with the given options. Typically you do not call " diff --git a/metaflow/plugins/kubernetes/kubernetes_job.py b/metaflow/plugins/kubernetes/kubernetes_job.py index ee05fdb6920..460385e90fc 100644 --- a/metaflow/plugins/kubernetes/kubernetes_job.py +++ b/metaflow/plugins/kubernetes/kubernetes_job.py @@ -3,6 +3,9 @@ import random import time +from metaflow.tracing import inject_tracing_vars + + from metaflow.exception import MetaflowException from metaflow.metaflow_config import KUBERNETES_SECRETS @@ -135,6 +138,10 @@ def create(self): "METAFLOW_KUBERNETES_SERVICE_ACCOUNT_NAME": "spec.serviceAccountName", "METAFLOW_KUBERNETES_NODE_IP": "status.hostIP", }.items() + ] + + [ + client.V1EnvVar(name=k, value=str(v)) + for k, v in inject_tracing_vars({}).items() ], env_from=[ client.V1EnvFromSource( diff --git a/metaflow/plugins/metadata/service.py b/metaflow/plugins/metadata/service.py index 150fdfc6c6b..c917f320cd5 100644 --- a/metaflow/plugins/metadata/service.py +++ b/metaflow/plugins/metadata/service.py @@ -59,7 +59,7 @@ def __init__(self, environment, flow, event_logger, monitor): def compute_info(cls, val): v = val.rstrip("/") try: - resp = requests.get(os.path.join(v, "ping"), headers=SERVICE_HEADERS) + resp = requests.get(os.path.join(v, "ping"), headers=SERVICE_HEADERS.copy()) resp.raise_for_status() except: # noqa E722 raise ValueError("Metaflow service [%s] unreachable." % v) @@ -413,25 +413,29 @@ def _request( if method == "GET": if monitor: with monitor.measure("metaflow.service_metadata.get"): - resp = requests.get(url, headers=SERVICE_HEADERS) + resp = requests.get(url, headers=SERVICE_HEADERS.copy()) else: - resp = requests.get(url, headers=SERVICE_HEADERS) + resp = requests.get(url, headers=SERVICE_HEADERS.copy()) elif method == "POST": if monitor: with monitor.measure("metaflow.service_metadata.post"): resp = requests.post( - url, headers=SERVICE_HEADERS, json=data + url, headers=SERVICE_HEADERS.copy(), json=data ) else: - resp = requests.post(url, headers=SERVICE_HEADERS, json=data) + resp = requests.post( + url, headers=SERVICE_HEADERS.copy(), json=data + ) elif method == "PATCH": if monitor: with monitor.measure("metaflow.service_metadata.patch"): resp = requests.patch( - url, headers=SERVICE_HEADERS, json=data + url, headers=SERVICE_HEADERS.copy(), json=data ) else: - resp = requests.patch(url, headers=SERVICE_HEADERS, json=data) + resp = requests.patch( + url, headers=SERVICE_HEADERS.copy(), json=data + ) else: raise MetaflowInternalError("Unexpected HTTP method %s" % (method,)) except MetaflowInternalError: @@ -496,9 +500,9 @@ def _version(cls, monitor): try: if monitor: with monitor.measure("metaflow.service_metadata.get"): - resp = requests.get(url, headers=SERVICE_HEADERS) + resp = requests.get(url, headers=SERVICE_HEADERS.copy()) else: - resp = requests.get(url, headers=SERVICE_HEADERS) + resp = requests.get(url, headers=SERVICE_HEADERS.copy()) except: if monitor: with monitor.count("metaflow.service_metadata.failed_request"): diff --git a/metaflow/runtime.py b/metaflow/runtime.py index 941fd9a7098..2f95fd2464f 100644 --- a/metaflow/runtime.py +++ b/metaflow/runtime.py @@ -35,6 +35,7 @@ UBF_CONTROL, UBF_TASK, ) +import metaflow.tracing as tracing MAX_WORKERS = 16 MAX_NUM_SPLITS = 100 @@ -1235,6 +1236,7 @@ def _launch(self): ) env.update(args.get_env()) env["PYTHONUNBUFFERED"] = "x" + tracing.inject_tracing_vars(env) # NOTE bufsize=1 below enables line buffering which is required # by read_logline() below that relies on readline() not blocking # print('running', args) diff --git a/metaflow/sidecar/sidecar_subprocess.py b/metaflow/sidecar/sidecar_subprocess.py index 0b2ee99dd3d..56681ecbcfd 100644 --- a/metaflow/sidecar/sidecar_subprocess.py +++ b/metaflow/sidecar/sidecar_subprocess.py @@ -12,6 +12,7 @@ from .sidecar_messages import Message, MessageTypes from ..debug import debug +from ..tracing import inject_tracing_vars MUST_SEND_RETRY_TIMES = 4 MESSAGE_WRITE_TIMEOUT_IN_MS = 1000 @@ -127,11 +128,14 @@ def send(self, msg, retries=3): def _start_subprocess(self, cmdline): for _ in range(3): try: + env = os.environ.copy() + inject_tracing_vars(env) # Set stdout=sys.stdout & stderr=sys.stderr # to print to console the output of sidecars. return subprocess.Popen( cmdline, stdin=subprocess.PIPE, + env=env, stdout=sys.stdout if debug.sidecar else subprocess.DEVNULL, stderr=sys.stderr if debug.sidecar else subprocess.DEVNULL, bufsize=0, diff --git a/metaflow/sidecar/sidecar_worker.py b/metaflow/sidecar/sidecar_worker.py index 6f264215d40..fab675a92f9 100644 --- a/metaflow/sidecar/sidecar_worker.py +++ b/metaflow/sidecar/sidecar_worker.py @@ -14,6 +14,7 @@ from metaflow.sidecar import Message, MessageTypes from metaflow.plugins import SIDECARS from metaflow._vendor import click +import metaflow.tracing as tracing def process_messages(worker_type, worker): @@ -47,6 +48,7 @@ def process_messages(worker_type, worker): pass +@tracing.cli_entrypoint("sidecar") @click.command(help="Initialize workers") @click.argument("worker-type") def main(worker_type): diff --git a/metaflow/task.py b/metaflow/task.py index ccba14c3c6a..073daab4ede 100644 --- a/metaflow/task.py +++ b/metaflow/task.py @@ -21,7 +21,7 @@ from .unbounded_foreach import UBF_CONTROL from .util import all_equal, get_username, resolve_identity, unicode_type from .current import current - +from metaflow.tracing import get_trace_id from collections import namedtuple ForeachFrame = namedtuple("ForeachFrame", ["step", "var", "num_splits", "index"]) @@ -396,36 +396,49 @@ def run_step( ) metadata_tags = ["attempt_id:{0}".format(retry_count)] + + metadata = [ + MetaDatum( + field="attempt", + value=str(retry_count), + type="attempt", + tags=metadata_tags, + ), + MetaDatum( + field="origin-run-id", + value=str(origin_run_id), + type="origin-run-id", + tags=metadata_tags, + ), + MetaDatum( + field="ds-type", + value=self.flow_datastore.TYPE, + type="ds-type", + tags=metadata_tags, + ), + MetaDatum( + field="ds-root", + value=self.flow_datastore.datastore_root, + type="ds-root", + tags=metadata_tags, + ), + ] + trace_id = get_trace_id() + if trace_id: + metadata.append( + MetaDatum( + field="otel-trace-id", + value=trace_id, + type="trace-id", + tags=metadata_tags, + ) + ) + self.metadata.register_metadata( run_id, step_name, task_id, - [ - MetaDatum( - field="attempt", - value=str(retry_count), - type="attempt", - tags=metadata_tags, - ), - MetaDatum( - field="origin-run-id", - value=str(origin_run_id), - type="origin-run-id", - tags=metadata_tags, - ), - MetaDatum( - field="ds-type", - value=self.flow_datastore.TYPE, - type="ds-type", - tags=metadata_tags, - ), - MetaDatum( - field="ds-root", - value=self.flow_datastore.datastore_root, - type="ds-root", - tags=metadata_tags, - ), - ], + metadata, ) step_func = getattr(self.flow, step_name) diff --git a/metaflow/tracing.py b/metaflow/tracing.py new file mode 100644 index 00000000000..00eeda0b402 --- /dev/null +++ b/metaflow/tracing.py @@ -0,0 +1,28 @@ +import sys +from metaflow.metaflow_config import ( + OTEL_ENDPOINT, +) + +import metaflow.tracing_noop + +init_tracing = metaflow.tracing_noop.init_tracing +cli_entrypoint = metaflow.tracing_noop.cli_entrypoint +inject_tracing_vars = metaflow.tracing_noop.inject_tracing_vars +get_trace_id = metaflow.tracing_noop.get_trace_id +traced = metaflow.tracing_noop.traced +tracing = metaflow.tracing_noop.tracing +post_fork = metaflow.tracing_noop.post_fork + +if OTEL_ENDPOINT: + try: + import metaflow.tracing_otel + + init_tracing = metaflow.tracing_otel.init_tracing + cli_entrypoint = metaflow.tracing_otel.cli_entrypoint + inject_tracing_vars = metaflow.tracing_otel.inject_tracing_vars + get_trace_id = metaflow.tracing_otel.get_trace_id + traced = metaflow.tracing_otel.traced + tracing = metaflow.tracing_otel.tracing + post_fork = metaflow.tracing_otel.post_fork + except ImportError: + pass diff --git a/metaflow/tracing_noop.py b/metaflow/tracing_noop.py new file mode 100644 index 00000000000..8fd448945c1 --- /dev/null +++ b/metaflow/tracing_noop.py @@ -0,0 +1,44 @@ +from functools import wraps +import contextlib +from typing import Dict + + +def init_tracing(): + pass + + +@contextlib.contextmanager +def post_fork(): + yield + + +def cli_entrypoint(name: str): + def cli_entrypoint_wrap(func): + @wraps(func) + def wrapper_func(*args, **kwargs): + return func(*args, **kwargs) + + return wrapper_func + + return cli_entrypoint_wrap + + +def inject_tracing_vars(env_dict: Dict[str, str]) -> Dict[str, str]: + return env_dict + + +def get_trace_id() -> str: + return "" + + +@contextlib.contextmanager +def traced(name, attrs={}): + yield + + +def tracing(func): + @wraps(func) + def wrapper_func(*args, **kwargs): + return func(*args, **kwargs) + + return wrapper_func diff --git a/metaflow/tracing_otel.py b/metaflow/tracing_otel.py new file mode 100644 index 00000000000..72b94164dfa --- /dev/null +++ b/metaflow/tracing_otel.py @@ -0,0 +1,170 @@ +import os + +import sys +from functools import wraps +import contextlib +from typing import Dict, List, Optional +from opentelemetry import trace as trace_api, context +from opentelemetry.trace.span import format_trace_id +from opentelemetry.propagate import extract, inject +from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( + OTLPSpanExporter, +) +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.propagate import set_global_textmap +from opentelemetry.sdk.resources import SERVICE_NAME, Resource + +tracer_provider = None + + +def init_tracing(): + from metaflow.metaflow_config import ( + SERVICE_AUTH_KEY, + SERVICE_HEADERS, + OTEL_ENDPOINT, + ) + + global tracer_provider + if tracer_provider is not None: + print("Tracing already initialized", file=sys.stderr) + return + # print("initializing tracing", os.environ.get("traceparent")) + + from metaflow.tracing_propagator import EnvPropagator + + set_global_textmap(EnvPropagator(None)) + + if SERVICE_AUTH_KEY: + span_exporter = OTLPSpanExporter( + endpoint=OTEL_ENDPOINT, + headers={"x-api-key": SERVICE_AUTH_KEY}, + timeout=1, + ) + elif SERVICE_HEADERS: + span_exporter = OTLPSpanExporter( + endpoint=OTEL_ENDPOINT, + headers=SERVICE_HEADERS, + timeout=1, + ) + else: + print("WARNING: no auth settings for Opentelemetry", file=sys.stderr) + return + + if "METAFLOW_KUBERNETES_POD_NAMESPACE" in os.environ: + service_name = "metaflow-kubernetes" + elif "AWS_BATCH_JOB_ID" in os.environ: + service_name = "metaflow-awsbatch" + else: + service_name = "metaflow-local" + + tracer_provider = TracerProvider( + resource=Resource.create({SERVICE_NAME: service_name}) + ) + trace_api.set_tracer_provider(tracer_provider) + + span_processor = BatchSpanProcessor(span_exporter) + tracer_provider.add_span_processor(span_processor) + + import requests + from opentelemetry.instrumentation.requests import RequestsInstrumentor + + RequestsInstrumentor().instrument() + + +@contextlib.contextmanager +def post_fork(): + global tracer_provider + tracer_provider = None + init_tracing() + token = context.attach(extract(os.environ)) + try: + tracer = trace_api.get_tracer_provider().get_tracer(__name__) + with tracer.start_as_current_span( + "fork", kind=trace_api.SpanKind.SERVER + ) as span: + span.set_attribute("cmd", " ".join(sys.argv)) + yield + finally: + context.detach(token) + + +def _extract_token_after(tokens: List[str], before_token: str) -> Optional[str]: + for i, tok in enumerate(tokens): + if i > 0 and tokens[i - 1] == before_token: + return tok + + +def cli_entrypoint(name: str): + def cli_entrypoint_wrap(func): + @wraps(func) + def wrapper_func(*args, **kwargs): + global tracer_provider + + init_tracing() + + assert tracer_provider is not None # make type checker happy + + token = context.attach(extract(os.environ)) + try: + tracer = trace_api.get_tracer_provider().get_tracer(__name__) + + card_subcommand = _extract_token_after(sys.argv, "card") + + step_name = _extract_token_after(sys.argv, "step") + task_id = _extract_token_after(sys.argv, "--task-id") + run_id = _extract_token_after(sys.argv, "--run-id") + if step_name and task_id and run_id: + better_name = "/".join([run_id, step_name, task_id]) + elif card_subcommand: + better_name = "card/" + card_subcommand + elif "run" in sys.argv: + better_name = "run" + else: + better_name = None + + with tracer.start_as_current_span( + better_name or name, kind=trace_api.SpanKind.SERVER + ) as span: + span.set_attribute("cmd", " ".join(sys.argv)) + span.set_attribute("pid", str(os.getpid())) + return func(*args, **kwargs) + finally: + context.detach(token) + try: + tracer_provider.force_flush() + except Exception as e: # pylint: disable=broad-except + print("TracerProvider failed to flush traces", e, file=sys.stderr) + + return wrapper_func + + return cli_entrypoint_wrap + + +def inject_tracing_vars(env_dict: Dict[str, str]) -> Dict[str, str]: + inject(env_dict) + return env_dict + + +def get_trace_id() -> str: + return format_trace_id(trace_api.get_current_span().get_span_context().trace_id) + + +@contextlib.contextmanager +def traced(name, attrs={}): + tracer = trace_api.get_tracer_provider().get_tracer(__name__) + with tracer.start_as_current_span(name) as span: + for k, v in attrs.items(): + span.set_attribute(k, v) + yield + + +def tracing(func): + @wraps(func) + def wrapper_func(*args, **kwargs): + tracer = trace_api.get_tracer_provider().get_tracer(func.__module__.__name__) + + with tracer.start_as_current_span(func.__name__): + return func(*args, **kwargs) + + return wrapper_func diff --git a/metaflow/tracing_propagator.py b/metaflow/tracing_propagator.py new file mode 100644 index 00000000000..e005f18a20b --- /dev/null +++ b/metaflow/tracing_propagator.py @@ -0,0 +1,75 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import typing +import logging + +from opentelemetry.context import Context + +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator +from opentelemetry.context.context import Context +from opentelemetry.propagators.textmap import ( + DefaultGetter, + DefaultSetter, + Getter, + Setter, + TextMapPropagator, + CarrierT, + CarrierValT, +) + + +class EnvPropagator(TextMapPropagator): + def __init__(self, formatter): + if formatter is None: + self.formatter = TraceContextTextMapPropagator() + else: + self.formatter = formatter + + # delegating to extract function implementation of the formatter + def extract( + self, + carrier: CarrierT, + context: typing.Optional[Context] = None, + getter: Getter = DefaultGetter(), + ) -> Context: + return self.formatter.extract(carrier=carrier, context=context, getter=getter) + + # delegating to inject function implementation of the formatter + def inject( + self, + carrier: CarrierT, + context: typing.Optional[Context] = None, + setter: Setter = DefaultSetter(), + ) -> None: + self.formatter.inject(carrier=carrier, context=context, setter=setter) + + # function for the user to inject trace details or baggage + def inject_to_carrier(self, context: typing.Optional[Context] = None): + env_dict = os.environ.copy() + self.inject(carrier=env_dict, context=context, setter=DefaultSetter()) + return env_dict + + # function for the user to extract trace context or baggage + def extract_context(self) -> Context: + if self.formatter is None: + self.formatter = TraceContextTextMapPropagator() + + return self.extract(carrier=os.environ, getter=DefaultGetter()) + + @property + def fields(self) -> typing.Set[str]: + # Returns a set with the fields set in `inject`. + return self.formatter.fields From e1aac632eb5d4ac6aff0b489af6d2c4d2cf50f02 Mon Sep 17 00:00:00 2001 From: Chaoying Date: Thu, 22 Jun 2023 01:26:08 +0000 Subject: [PATCH 02/10] refactor tracing module to folders --- metaflow/metaflow_config.py | 2 + metaflow/multicore_utils.py | 1 - metaflow/sidecar/sidecar_subprocess.py | 2 +- metaflow/tracing.py | 28 --------- .../{tracing_noop.py => tracing/__init__.py} | 13 ++++ .../propagator.py} | 0 metaflow/tracing/span_exporter.py | 61 +++++++++++++++++++ .../tracing_modules.py} | 46 ++++---------- 8 files changed, 88 insertions(+), 65 deletions(-) delete mode 100644 metaflow/tracing.py rename metaflow/{tracing_noop.py => tracing/__init__.py} (72%) rename metaflow/{tracing_propagator.py => tracing/propagator.py} (100%) create mode 100644 metaflow/tracing/span_exporter.py rename metaflow/{tracing_otel.py => tracing/tracing_modules.py} (84%) diff --git a/metaflow/metaflow_config.py b/metaflow/metaflow_config.py index d8c623346c9..09ea9529ea7 100644 --- a/metaflow/metaflow_config.py +++ b/metaflow/metaflow_config.py @@ -400,6 +400,8 @@ KUBERNETES_SANDBOX_INIT_SCRIPT = from_conf("KUBERNETES_SANDBOX_INIT_SCRIPT") OTEL_ENDPOINT = from_conf("OTEL_ENDPOINT") +ZIPKIN_ENDPOINT = from_conf("ZIPKIN_ENDPOINT") +CONSOLE_TRACE_ENABLED = from_conf("CONSOLE_TRACE_ENABLED", False) # MAX_ATTEMPTS is the maximum number of attempts, including the first # task, retries, and the final fallback task and its retries. diff --git a/metaflow/multicore_utils.py b/metaflow/multicore_utils.py index 5b9828af18b..8f5d28ca67c 100644 --- a/metaflow/multicore_utils.py +++ b/metaflow/multicore_utils.py @@ -6,7 +6,6 @@ from tempfile import NamedTemporaryFile import time import metaflow.tracing as tracing - try: # Python 2 import cPickle as pickle diff --git a/metaflow/sidecar/sidecar_subprocess.py b/metaflow/sidecar/sidecar_subprocess.py index 56681ecbcfd..67cf39eaa5a 100644 --- a/metaflow/sidecar/sidecar_subprocess.py +++ b/metaflow/sidecar/sidecar_subprocess.py @@ -12,7 +12,7 @@ from .sidecar_messages import Message, MessageTypes from ..debug import debug -from ..tracing import inject_tracing_vars +from metaflow.tracing import inject_tracing_vars MUST_SEND_RETRY_TIMES = 4 MESSAGE_WRITE_TIMEOUT_IN_MS = 1000 diff --git a/metaflow/tracing.py b/metaflow/tracing.py deleted file mode 100644 index 00eeda0b402..00000000000 --- a/metaflow/tracing.py +++ /dev/null @@ -1,28 +0,0 @@ -import sys -from metaflow.metaflow_config import ( - OTEL_ENDPOINT, -) - -import metaflow.tracing_noop - -init_tracing = metaflow.tracing_noop.init_tracing -cli_entrypoint = metaflow.tracing_noop.cli_entrypoint -inject_tracing_vars = metaflow.tracing_noop.inject_tracing_vars -get_trace_id = metaflow.tracing_noop.get_trace_id -traced = metaflow.tracing_noop.traced -tracing = metaflow.tracing_noop.tracing -post_fork = metaflow.tracing_noop.post_fork - -if OTEL_ENDPOINT: - try: - import metaflow.tracing_otel - - init_tracing = metaflow.tracing_otel.init_tracing - cli_entrypoint = metaflow.tracing_otel.cli_entrypoint - inject_tracing_vars = metaflow.tracing_otel.inject_tracing_vars - get_trace_id = metaflow.tracing_otel.get_trace_id - traced = metaflow.tracing_otel.traced - tracing = metaflow.tracing_otel.tracing - post_fork = metaflow.tracing_otel.post_fork - except ImportError: - pass diff --git a/metaflow/tracing_noop.py b/metaflow/tracing/__init__.py similarity index 72% rename from metaflow/tracing_noop.py rename to metaflow/tracing/__init__.py index 8fd448945c1..6094630a7ad 100644 --- a/metaflow/tracing_noop.py +++ b/metaflow/tracing/__init__.py @@ -1,3 +1,9 @@ +import sys +from metaflow.metaflow_config import ( + OTEL_ENDPOINT, + ZIPKIN_ENDPOINT, + CONSOLE_TRACE_ENABLED, +) from functools import wraps import contextlib from typing import Dict @@ -42,3 +48,10 @@ def wrapper_func(*args, **kwargs): return func(*args, **kwargs) return wrapper_func + +if CONSOLE_TRACE_ENABLED or OTEL_ENDPOINT or ZIPKIN_ENDPOINT: + try: + from .tracing_modules import * + + except ImportError as e: + print(e.msg) diff --git a/metaflow/tracing_propagator.py b/metaflow/tracing/propagator.py similarity index 100% rename from metaflow/tracing_propagator.py rename to metaflow/tracing/propagator.py diff --git a/metaflow/tracing/span_exporter.py b/metaflow/tracing/span_exporter.py new file mode 100644 index 00000000000..45a3e4dc794 --- /dev/null +++ b/metaflow/tracing/span_exporter.py @@ -0,0 +1,61 @@ +import sys +from metaflow.metaflow_config import ( + OTEL_ENDPOINT, + ZIPKIN_ENDPOINT, + CONSOLE_TRACE_ENABLED +) + +from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( + OTLPSpanExporter, +) +from opentelemetry.exporter.zipkin.proto.http import ZipkinExporter + +from opentelemetry.sdk.trace.export import ConsoleSpanExporter + +def get_span_exporter(): + if OTEL_ENDPOINT: + return set_otel_exporter() + + elif ZIPKIN_ENDPOINT: + return set_zipkin_exporter() + elif CONSOLE_TRACE_ENABLED: + return set_console_exporter() + else: + print("WARNING: endpoints not set up for Opentelemetry", file=sys.stderr) + return + + + +def set_otel_exporter(): + from metaflow.metaflow_config import ( + SERVICE_AUTH_KEY, + SERVICE_HEADERS, + ) + + if SERVICE_AUTH_KEY: + span_exporter = OTLPSpanExporter( + endpoint=OTEL_ENDPOINT, + headers={"x-api-key": SERVICE_AUTH_KEY}, + timeout=1, + ) + elif SERVICE_HEADERS: + span_exporter = OTLPSpanExporter( + endpoint=OTEL_ENDPOINT, + headers=SERVICE_HEADERS, + timeout=1, + ) + else: + print("WARNING: no auth settings for Opentelemetry", file=sys.stderr) + return + return span_exporter + +def set_zipkin_exporter(): + span_exporter = ZipkinExporter( + endpoint=ZIPKIN_ENDPOINT, + ) + return span_exporter + +def set_console_exporter(): + span_exporter = ConsoleSpanExporter() + return span_exporter + diff --git a/metaflow/tracing_otel.py b/metaflow/tracing/tracing_modules.py similarity index 84% rename from metaflow/tracing_otel.py rename to metaflow/tracing/tracing_modules.py index 72b94164dfa..503b9ebf98a 100644 --- a/metaflow/tracing_otel.py +++ b/metaflow/tracing/tracing_modules.py @@ -1,55 +1,31 @@ import os - import sys -from functools import wraps -import contextlib -from typing import Dict, List, Optional -from opentelemetry import trace as trace_api, context -from opentelemetry.trace.span import format_trace_id -from opentelemetry.propagate import extract, inject -from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( - OTLPSpanExporter, -) + from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.propagate import set_global_textmap from opentelemetry.sdk.resources import SERVICE_NAME, Resource +from opentelemetry.trace.span import format_trace_id +from opentelemetry.propagate import extract, inject +from functools import wraps +import contextlib +from typing import Dict, List, Optional +from opentelemetry import trace as trace_api, context +from .span_exporter import get_span_exporter tracer_provider = None - def init_tracing(): - from metaflow.metaflow_config import ( - SERVICE_AUTH_KEY, - SERVICE_HEADERS, - OTEL_ENDPOINT, - ) - global tracer_provider if tracer_provider is not None: print("Tracing already initialized", file=sys.stderr) return - # print("initializing tracing", os.environ.get("traceparent")) - from metaflow.tracing_propagator import EnvPropagator + from .propagator import EnvPropagator set_global_textmap(EnvPropagator(None)) + span_exporter = get_span_exporter() - if SERVICE_AUTH_KEY: - span_exporter = OTLPSpanExporter( - endpoint=OTEL_ENDPOINT, - headers={"x-api-key": SERVICE_AUTH_KEY}, - timeout=1, - ) - elif SERVICE_HEADERS: - span_exporter = OTLPSpanExporter( - endpoint=OTEL_ENDPOINT, - headers=SERVICE_HEADERS, - timeout=1, - ) - else: - print("WARNING: no auth settings for Opentelemetry", file=sys.stderr) - return if "METAFLOW_KUBERNETES_POD_NAMESPACE" in os.environ: service_name = "metaflow-kubernetes" @@ -66,12 +42,12 @@ def init_tracing(): span_processor = BatchSpanProcessor(span_exporter) tracer_provider.add_span_processor(span_processor) - import requests from opentelemetry.instrumentation.requests import RequestsInstrumentor RequestsInstrumentor().instrument() + @contextlib.contextmanager def post_fork(): global tracer_provider From de48b9a4ca0106350b65cd2706480f1daab8ca18 Mon Sep 17 00:00:00 2001 From: Chaoying Date: Thu, 22 Jun 2023 01:27:44 +0000 Subject: [PATCH 03/10] reformat code using black --- metaflow/multicore_utils.py | 2 +- metaflow/sidecar/sidecar_subprocess.py | 1 - metaflow/tracing/__init__.py | 1 + metaflow/tracing/span_exporter.py | 15 ++++++++------- metaflow/tracing/tracing_modules.py | 3 +-- 5 files changed, 11 insertions(+), 11 deletions(-) diff --git a/metaflow/multicore_utils.py b/metaflow/multicore_utils.py index 8f5d28ca67c..4cd4d4fde19 100644 --- a/metaflow/multicore_utils.py +++ b/metaflow/multicore_utils.py @@ -6,6 +6,7 @@ from tempfile import NamedTemporaryFile import time import metaflow.tracing as tracing + try: # Python 2 import cPickle as pickle @@ -61,7 +62,6 @@ def _spawn(func, arg, dir): def parallel_imap_unordered(func, iterable, max_parallel=None, dir=None): - if max_parallel is None: max_parallel = cpu_count() diff --git a/metaflow/sidecar/sidecar_subprocess.py b/metaflow/sidecar/sidecar_subprocess.py index 67cf39eaa5a..e58b049f8d1 100644 --- a/metaflow/sidecar/sidecar_subprocess.py +++ b/metaflow/sidecar/sidecar_subprocess.py @@ -68,7 +68,6 @@ def __init__(self, worker_type): self.start() def start(self): - if ( self._worker_type is not None and self._worker_type.startswith(NULL_SIDECAR_PREFIX) diff --git a/metaflow/tracing/__init__.py b/metaflow/tracing/__init__.py index 6094630a7ad..9296babad18 100644 --- a/metaflow/tracing/__init__.py +++ b/metaflow/tracing/__init__.py @@ -49,6 +49,7 @@ def wrapper_func(*args, **kwargs): return wrapper_func + if CONSOLE_TRACE_ENABLED or OTEL_ENDPOINT or ZIPKIN_ENDPOINT: try: from .tracing_modules import * diff --git a/metaflow/tracing/span_exporter.py b/metaflow/tracing/span_exporter.py index 45a3e4dc794..4a3c4971307 100644 --- a/metaflow/tracing/span_exporter.py +++ b/metaflow/tracing/span_exporter.py @@ -2,7 +2,7 @@ from metaflow.metaflow_config import ( OTEL_ENDPOINT, ZIPKIN_ENDPOINT, - CONSOLE_TRACE_ENABLED + CONSOLE_TRACE_ENABLED, ) from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( @@ -10,12 +10,13 @@ ) from opentelemetry.exporter.zipkin.proto.http import ZipkinExporter -from opentelemetry.sdk.trace.export import ConsoleSpanExporter +from opentelemetry.sdk.trace.export import ConsoleSpanExporter + def get_span_exporter(): if OTEL_ENDPOINT: return set_otel_exporter() - + elif ZIPKIN_ENDPOINT: return set_zipkin_exporter() elif CONSOLE_TRACE_ENABLED: @@ -25,7 +26,6 @@ def get_span_exporter(): return - def set_otel_exporter(): from metaflow.metaflow_config import ( SERVICE_AUTH_KEY, @@ -49,13 +49,14 @@ def set_otel_exporter(): return return span_exporter + def set_zipkin_exporter(): span_exporter = ZipkinExporter( - endpoint=ZIPKIN_ENDPOINT, - ) + endpoint=ZIPKIN_ENDPOINT, + ) return span_exporter + def set_console_exporter(): span_exporter = ConsoleSpanExporter() return span_exporter - diff --git a/metaflow/tracing/tracing_modules.py b/metaflow/tracing/tracing_modules.py index 503b9ebf98a..e46889f7375 100644 --- a/metaflow/tracing/tracing_modules.py +++ b/metaflow/tracing/tracing_modules.py @@ -15,6 +15,7 @@ tracer_provider = None + def init_tracing(): global tracer_provider if tracer_provider is not None: @@ -26,7 +27,6 @@ def init_tracing(): set_global_textmap(EnvPropagator(None)) span_exporter = get_span_exporter() - if "METAFLOW_KUBERNETES_POD_NAMESPACE" in os.environ: service_name = "metaflow-kubernetes" elif "AWS_BATCH_JOB_ID" in os.environ: @@ -47,7 +47,6 @@ def init_tracing(): RequestsInstrumentor().instrument() - @contextlib.contextmanager def post_fork(): global tracer_provider From 436eee4169f5c93805647b8ac850e7a1e8be0764 Mon Sep 17 00:00:00 2001 From: Chaoying Wang Date: Mon, 3 Jul 2023 02:00:16 +0000 Subject: [PATCH 04/10] remove unused import --- metaflow/cmd/configure_cmd.py | 1 - 1 file changed, 1 deletion(-) diff --git a/metaflow/cmd/configure_cmd.py b/metaflow/cmd/configure_cmd.py index af5bb9dc42a..d4ef1a5a541 100644 --- a/metaflow/cmd/configure_cmd.py +++ b/metaflow/cmd/configure_cmd.py @@ -7,7 +7,6 @@ from metaflow.util import to_unicode from metaflow._vendor import click from metaflow.util import to_unicode -import metaflow.tracing as tracing from .util import echo_always, makedirs From 8eb9559d3ff4fa926937a118f525ed9c333af241 Mon Sep 17 00:00:00 2001 From: Chaoying Wang Date: Thu, 13 Jul 2023 23:17:01 +0000 Subject: [PATCH 05/10] change exporter to conditional import --- metaflow/tracing/span_exporter.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/metaflow/tracing/span_exporter.py b/metaflow/tracing/span_exporter.py index 4a3c4971307..6898be95eb7 100644 --- a/metaflow/tracing/span_exporter.py +++ b/metaflow/tracing/span_exporter.py @@ -5,13 +5,6 @@ CONSOLE_TRACE_ENABLED, ) -from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( - OTLPSpanExporter, -) -from opentelemetry.exporter.zipkin.proto.http import ZipkinExporter - -from opentelemetry.sdk.trace.export import ConsoleSpanExporter - def get_span_exporter(): if OTEL_ENDPOINT: @@ -19,6 +12,7 @@ def get_span_exporter(): elif ZIPKIN_ENDPOINT: return set_zipkin_exporter() + elif CONSOLE_TRACE_ENABLED: return set_console_exporter() else: @@ -27,6 +21,8 @@ def get_span_exporter(): def set_otel_exporter(): + from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter + from metaflow.metaflow_config import ( SERVICE_AUTH_KEY, SERVICE_HEADERS, @@ -51,6 +47,8 @@ def set_otel_exporter(): def set_zipkin_exporter(): + from opentelemetry.exporter.zipkin.proto.http import ZipkinExporter + span_exporter = ZipkinExporter( endpoint=ZIPKIN_ENDPOINT, ) @@ -58,5 +56,7 @@ def set_zipkin_exporter(): def set_console_exporter(): + from opentelemetry.sdk.trace.export import ConsoleSpanExporter + span_exporter = ConsoleSpanExporter() return span_exporter From c4e4d3d860a3e41794c69c523ead0383c8dc8b6a Mon Sep 17 00:00:00 2001 From: Sakari Ikonen Date: Fri, 18 Aug 2023 15:03:13 +0300 Subject: [PATCH 06/10] remove unused imports --- metaflow/tracing/__init__.py | 1 - metaflow/tracing/propagator.py | 2 -- 2 files changed, 3 deletions(-) diff --git a/metaflow/tracing/__init__.py b/metaflow/tracing/__init__.py index 9296babad18..72da6682d0f 100644 --- a/metaflow/tracing/__init__.py +++ b/metaflow/tracing/__init__.py @@ -1,4 +1,3 @@ -import sys from metaflow.metaflow_config import ( OTEL_ENDPOINT, ZIPKIN_ENDPOINT, diff --git a/metaflow/tracing/propagator.py b/metaflow/tracing/propagator.py index e005f18a20b..acd2134d8b2 100644 --- a/metaflow/tracing/propagator.py +++ b/metaflow/tracing/propagator.py @@ -14,7 +14,6 @@ import os import typing -import logging from opentelemetry.context import Context @@ -27,7 +26,6 @@ Setter, TextMapPropagator, CarrierT, - CarrierValT, ) From 9c31d3b76ee4ee48924b56a9b057d6984c00f99f Mon Sep 17 00:00:00 2001 From: Sakari Ikonen Date: Tue, 22 Aug 2023 12:04:12 +0300 Subject: [PATCH 07/10] prevent tracing module loading during conda bootstrapping process. --- metaflow/metaflow_config.py | 4 ++++ metaflow/plugins/pypi/conda_environment.py | 6 +++++- metaflow/tracing/__init__.py | 5 ++++- 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/metaflow/metaflow_config.py b/metaflow/metaflow_config.py index 09ea9529ea7..a0f04e08500 100644 --- a/metaflow/metaflow_config.py +++ b/metaflow/metaflow_config.py @@ -402,6 +402,10 @@ OTEL_ENDPOINT = from_conf("OTEL_ENDPOINT") ZIPKIN_ENDPOINT = from_conf("ZIPKIN_ENDPOINT") CONSOLE_TRACE_ENABLED = from_conf("CONSOLE_TRACE_ENABLED", False) +# internal env used for preventing the tracing module from loading during Conda bootstrapping. +BOOTSTRAPPING_CONDA_ENVIRONMENT = bool( + os.environ.get("BOOTSTRAPPING_CONDA_ENVIRONMENT", False) +) # MAX_ATTEMPTS is the maximum number of attempts, including the first # task, retries, and the final fallback task and its retries. diff --git a/metaflow/plugins/pypi/conda_environment.py b/metaflow/plugins/pypi/conda_environment.py index c497d79fced..6d21d5c9c24 100644 --- a/metaflow/plugins/pypi/conda_environment.py +++ b/metaflow/plugins/pypi/conda_environment.py @@ -320,7 +320,11 @@ def bootstrap_commands(self, step_name, datastore_type): if id_: return [ "echo 'Bootstrapping virtual environment...'", - 'python -m metaflow.plugins.pypi.bootstrap "%s" %s "%s" linux-64' + # We have to prevent the tracing module from loading, + # as the bootstrapping process uses the internal S3 client which would fail to import tracing + # due to the required dependencies being bundled into the conda environment, + # which is yet to be initialized at this point. + 'BOOTSTRAPPING_CONDA_ENVIRONMENT=True python -m metaflow.plugins.pypi.bootstrap "%s" %s "%s" linux-64' % (self.flow.name, id_, self.datastore_type), "echo 'Environment bootstrapped.'", "export PATH=$PATH:$(pwd)/micromamba", diff --git a/metaflow/tracing/__init__.py b/metaflow/tracing/__init__.py index 72da6682d0f..aadb7793c6c 100644 --- a/metaflow/tracing/__init__.py +++ b/metaflow/tracing/__init__.py @@ -2,6 +2,7 @@ OTEL_ENDPOINT, ZIPKIN_ENDPOINT, CONSOLE_TRACE_ENABLED, + BOOTSTRAPPING_CONDA_ENVIRONMENT, ) from functools import wraps import contextlib @@ -49,7 +50,9 @@ def wrapper_func(*args, **kwargs): return wrapper_func -if CONSOLE_TRACE_ENABLED or OTEL_ENDPOINT or ZIPKIN_ENDPOINT: +if not BOOTSTRAPPING_CONDA_ENVIRONMENT and ( + CONSOLE_TRACE_ENABLED or OTEL_ENDPOINT or ZIPKIN_ENDPOINT +): try: from .tracing_modules import * From 85a43dc5b0690f2595f7089a025f8ce3b7cef603 Mon Sep 17 00:00:00 2001 From: Sakari Ikonen Date: Tue, 22 Aug 2023 13:47:01 +0300 Subject: [PATCH 08/10] explicitly import tracing functions from tracing_modules --- metaflow/tracing/__init__.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/metaflow/tracing/__init__.py b/metaflow/tracing/__init__.py index aadb7793c6c..5472c71258c 100644 --- a/metaflow/tracing/__init__.py +++ b/metaflow/tracing/__init__.py @@ -54,7 +54,16 @@ def wrapper_func(*args, **kwargs): CONSOLE_TRACE_ENABLED or OTEL_ENDPOINT or ZIPKIN_ENDPOINT ): try: - from .tracing_modules import * + # Overrides No-Op implementations if a specific provider is configured. + from .tracing_modules import ( + init_tracing, + post_fork, + cli_entrypoint, + inject_tracing_vars, + get_trace_id, + traced, + tracing, + ) except ImportError as e: print(e.msg) From 6d5afc6aafaf8ad146c68dd0bbcc6e021f64b546 Mon Sep 17 00:00:00 2001 From: Sakari Ikonen Date: Wed, 18 Oct 2023 19:27:43 +0300 Subject: [PATCH 09/10] rename env var for tracing disable during bootstrapping. --- metaflow/metaflow_config.py | 4 +--- metaflow/plugins/pypi/conda_environment.py | 2 +- metaflow/tracing/__init__.py | 6 ++---- 3 files changed, 4 insertions(+), 8 deletions(-) diff --git a/metaflow/metaflow_config.py b/metaflow/metaflow_config.py index a0f04e08500..d857fbb0d88 100644 --- a/metaflow/metaflow_config.py +++ b/metaflow/metaflow_config.py @@ -403,9 +403,7 @@ ZIPKIN_ENDPOINT = from_conf("ZIPKIN_ENDPOINT") CONSOLE_TRACE_ENABLED = from_conf("CONSOLE_TRACE_ENABLED", False) # internal env used for preventing the tracing module from loading during Conda bootstrapping. -BOOTSTRAPPING_CONDA_ENVIRONMENT = bool( - os.environ.get("BOOTSTRAPPING_CONDA_ENVIRONMENT", False) -) +DISABLE_TRACING = bool(os.environ.get("DISABLE_TRACING", False)) # MAX_ATTEMPTS is the maximum number of attempts, including the first # task, retries, and the final fallback task and its retries. diff --git a/metaflow/plugins/pypi/conda_environment.py b/metaflow/plugins/pypi/conda_environment.py index 6d21d5c9c24..9e0de9f0bee 100644 --- a/metaflow/plugins/pypi/conda_environment.py +++ b/metaflow/plugins/pypi/conda_environment.py @@ -324,7 +324,7 @@ def bootstrap_commands(self, step_name, datastore_type): # as the bootstrapping process uses the internal S3 client which would fail to import tracing # due to the required dependencies being bundled into the conda environment, # which is yet to be initialized at this point. - 'BOOTSTRAPPING_CONDA_ENVIRONMENT=True python -m metaflow.plugins.pypi.bootstrap "%s" %s "%s" linux-64' + 'DISABLE_TRACING=True python -m metaflow.plugins.pypi.bootstrap "%s" %s "%s" linux-64' % (self.flow.name, id_, self.datastore_type), "echo 'Environment bootstrapped.'", "export PATH=$PATH:$(pwd)/micromamba", diff --git a/metaflow/tracing/__init__.py b/metaflow/tracing/__init__.py index 5472c71258c..24603b8082e 100644 --- a/metaflow/tracing/__init__.py +++ b/metaflow/tracing/__init__.py @@ -2,7 +2,7 @@ OTEL_ENDPOINT, ZIPKIN_ENDPOINT, CONSOLE_TRACE_ENABLED, - BOOTSTRAPPING_CONDA_ENVIRONMENT, + DISABLE_TRACING, ) from functools import wraps import contextlib @@ -50,9 +50,7 @@ def wrapper_func(*args, **kwargs): return wrapper_func -if not BOOTSTRAPPING_CONDA_ENVIRONMENT and ( - CONSOLE_TRACE_ENABLED or OTEL_ENDPOINT or ZIPKIN_ENDPOINT -): +if not DISABLE_TRACING and (CONSOLE_TRACE_ENABLED or OTEL_ENDPOINT or ZIPKIN_ENDPOINT): try: # Overrides No-Op implementations if a specific provider is configured. from .tracing_modules import ( From 4f64784374a3241b3e7f0459f2fe0b1007899bc8 Mon Sep 17 00:00:00 2001 From: Sakari Ikonen Date: Thu, 19 Oct 2023 16:39:10 +0300 Subject: [PATCH 10/10] fix R support --- metaflow/cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metaflow/cli.py b/metaflow/cli.py index ea223424619..7c13f5f4362 100644 --- a/metaflow/cli.py +++ b/metaflow/cli.py @@ -1175,7 +1175,7 @@ def main(flow, args=None, handle_exceptions=True, entrypoint=None): start(auto_envvar_prefix="METAFLOW", obj=state) else: try: - start.main(args=args, obj=state, auto_envvar_prefix="METAFLOW") + start(args=args, obj=state, auto_envvar_prefix="METAFLOW") except SystemExit as e: return e.code except MetaflowException as x: