Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Otel oss integration #1462

Merged
merged 10 commits into from
Oct 25, 2023
5 changes: 4 additions & 1 deletion metaflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import traceback
from datetime import datetime
from functools import wraps
import metaflow.tracing as tracing

from metaflow._vendor import click

Expand Down Expand Up @@ -796,6 +797,7 @@ def resume(
runtime.execute()


@tracing.cli_entrypoint("cli/run")
@parameters.add_custom_parameters(deploy_mode=True)
@cli.command(help="Run the workflow locally.")
@common_run_options
Expand Down Expand Up @@ -894,6 +896,7 @@ def version(obj):
echo_always(obj.version)


@tracing.cli_entrypoint("cli/start")
@decorators.add_decorator_options
@click.command(
cls=click.CommandCollection,
Expand Down Expand Up @@ -1172,7 +1175,7 @@ def main(flow, args=None, handle_exceptions=True, entrypoint=None):
start(auto_envvar_prefix="METAFLOW", obj=state)
else:
try:
start.main(args=args, obj=state, auto_envvar_prefix="METAFLOW")
start(args=args, obj=state, auto_envvar_prefix="METAFLOW")
except SystemExit as e:
return e.code
except MetaflowException as x:
Expand Down
2 changes: 2 additions & 0 deletions metaflow/cmd/main_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
from metaflow.metaflow_version import get_version

from .util import echo_always
import metaflow.tracing as tracing


@click.group()
@tracing.cli_entrypoint("cli/main")
def main():
pass

Expand Down
4 changes: 3 additions & 1 deletion metaflow/metadata/heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ def _ping(self):

def _heartbeat(self):
if self.hb_url is not None:
response = requests.post(url=self.hb_url, data="{}", headers=self.headers)
response = requests.post(
url=self.hb_url, data="{}", headers=self.headers.copy()
saikonen marked this conversation as resolved.
Show resolved Hide resolved
)
# Unfortunately, response.json() returns a string that we need
# to cast to json; however when the request encounters an error
# the return type is a json blob :/
Expand Down
6 changes: 6 additions & 0 deletions metaflow/metaflow_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,12 @@

KUBERNETES_SANDBOX_INIT_SCRIPT = from_conf("KUBERNETES_SANDBOX_INIT_SCRIPT")

OTEL_ENDPOINT = from_conf("OTEL_ENDPOINT")
ZIPKIN_ENDPOINT = from_conf("ZIPKIN_ENDPOINT")
CONSOLE_TRACE_ENABLED = from_conf("CONSOLE_TRACE_ENABLED", False)
# internal env used for preventing the tracing module from loading during Conda bootstrapping.
DISABLE_TRACING = bool(os.environ.get("DISABLE_TRACING", False))

# MAX_ATTEMPTS is the maximum number of attempts, including the first
# task, retries, and the final fallback task and its retries.
#
Expand Down
3 changes: 3 additions & 0 deletions metaflow/mflog/save_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@
from metaflow.util import Path
from . import TASK_LOG_SOURCE

from metaflow.tracing import cli_entrypoint

SMALL_FILE_LIMIT = 1024 * 1024


@cli_entrypoint("save_logs")
saikonen marked this conversation as resolved.
Show resolved Hide resolved
def save_logs():
def _read_file(path):
with open(path, "rb") as f:
Expand Down
37 changes: 19 additions & 18 deletions metaflow/multicore_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from multiprocessing import cpu_count
from tempfile import NamedTemporaryFile
import time
import metaflow.tracing as tracing

try:
# Python 2
Expand Down Expand Up @@ -40,27 +41,27 @@ def _spawn(func, arg, dir):
if pid:
return pid, output_file
else:
try:
exit_code = 1
ret = func(arg)
with open(output_file, "wb") as f:
pickle.dump(ret, f, protocol=pickle.HIGHEST_PROTOCOL)
exit_code = 0
except:
# we must not let any exceptions escape this function
# which might trigger unintended side effects
traceback.print_exc()
finally:
sys.stderr.flush()
sys.stdout.flush()
# we can't use sys.exit(0) here since it raises SystemExit
# that may have unintended side effects (e.g. triggering
# finally blocks).
os._exit(exit_code)
with tracing.post_fork():
try:
exit_code = 1
ret = func(arg)
with open(output_file, "wb") as f:
pickle.dump(ret, f, protocol=pickle.HIGHEST_PROTOCOL)
exit_code = 0
except:
# we must not let any exceptions escape this function
# which might trigger unintended side-effects
traceback.print_exc()
finally:
sys.stderr.flush()
sys.stdout.flush()
# we can't use sys.exit(0) here since it raises SystemExit
# that may have unintended side-effects (e.g. triggering
# finally blocks).
os._exit(exit_code)


def parallel_imap_unordered(func, iterable, max_parallel=None, dir=None):

if max_parallel is None:
max_parallel = cpu_count()

Expand Down
15 changes: 12 additions & 3 deletions metaflow/plugins/datatools/s3/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
)
from metaflow.exception import MetaflowException
from metaflow.debug import debug
import metaflow.tracing as tracing

try:
# python2
Expand Down Expand Up @@ -1173,9 +1174,14 @@ def put(
def _upload(s3, _):
# We make sure we are at the beginning in case we are retrying
blob.seek(0)
s3.upload_fileobj(
blob, src.netloc, src.path.lstrip("/"), ExtraArgs=extra_args
)

# We use manual tracing here because boto3 instrumentation
# has an issue with upload_fileobj losing track of tracing context
# https://github.com/open-telemetry/opentelemetry-python-contrib/issues/298
with tracing.traced("s3.upload_fileobj", {"path": src.path}):
s3.upload_fileobj(
blob, src.netloc, src.path.lstrip("/"), ExtraArgs=extra_args
)

if overwrite:
self._one_boto_op(_upload, url, create_tmp_file=False)
Expand Down Expand Up @@ -1605,10 +1611,13 @@ def try_s3_op(last_ok_count, pending_retries, out_lines, inject_failures):
try:
debug.s3client_exec(cmdline + addl_cmdline)
# Run the operation.
env = os.environ.copy()
tracing.inject_tracing_vars(env)
saikonen marked this conversation as resolved.
Show resolved Hide resolved
stdout = subprocess.check_output(
cmdline + addl_cmdline,
cwd=self._tmpdir,
stderr=stderr.file,
env=env,
)
# Here we did not have any error -- transient or otherwise.
ok_lines = stdout.splitlines()
Expand Down
5 changes: 5 additions & 0 deletions metaflow/plugins/datatools/s3/s3op.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
TRANSIENT_RETRY_LINE_CONTENT,
TRANSIENT_RETRY_START_LINE,
)
import metaflow.tracing as tracing

NUM_WORKERS_DEFAULT = 64

Expand Down Expand Up @@ -153,6 +154,7 @@ def normalize_client_error(err):
# S3 worker pool


@tracing.cli_entrypoint("s3op/worker")
def worker(result_file_name, queue, mode, s3config):
# Interpret mode, it can either be a single op or something like
# info_download or info_upload which implies:
Expand Down Expand Up @@ -719,6 +721,7 @@ def cli():
pass


@tracing.cli_entrypoint("s3op/list")
@cli.command("list", help="List S3 objects")
@click.option(
"--recursive/--no-recursive",
Expand Down Expand Up @@ -778,6 +781,7 @@ def lst(
print(format_result_line(idx, url.prefix, url.url, str(size)))


@tracing.cli_entrypoint("s3op/put")
@cli.command(help="Upload files to S3")
@click.option(
"--file",
Expand Down Expand Up @@ -972,6 +976,7 @@ def _populate_prefixes(prefixes, inputs):
return prefixes, is_transient_retry


@tracing.cli_entrypoint("s3op/get")
@cli.command(help="Download files from S3")
@click.option(
"--recursive/--no-recursive",
Expand Down
2 changes: 2 additions & 0 deletions metaflow/plugins/kubernetes/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
SERVICE_HEADERS,
SERVICE_INTERNAL_URL,
S3_SERVER_SIDE_ENCRYPTION,
OTEL_ENDPOINT,
)
from metaflow.metaflow_config_funcs import config_values

Expand Down Expand Up @@ -255,6 +256,7 @@ def create_job(
.environment_variable(
"METAFLOW_INIT_SCRIPT", KUBERNETES_SANDBOX_INIT_SCRIPT
)
.environment_variable("METAFLOW_OTEL_ENDPOINT", OTEL_ENDPOINT)
# Skip setting METAFLOW_DATASTORE_SYSROOT_LOCAL because metadata sync
# between the local user instance and the remote Kubernetes pod
# assumes metadata is stored in DATASTORE_LOCAL_DIR on the Kubernetes
Expand Down
2 changes: 2 additions & 0 deletions metaflow/plugins/kubernetes/kubernetes_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from metaflow.metadata.util import sync_local_metadata_from_datastore
from metaflow.metaflow_config import DATASTORE_LOCAL_DIR, KUBERNETES_LABELS
from metaflow.mflog import TASK_LOG_SOURCE
import metaflow.tracing as tracing

from .kubernetes import Kubernetes, KubernetesKilledException, parse_kube_keyvalue_list
from .kubernetes_decorator import KubernetesDecorator
Expand All @@ -24,6 +25,7 @@ def kubernetes():
pass


@tracing.cli_entrypoint("kubernetes/step")
@kubernetes.command(
help="Execute a single task on Kubernetes. This command calls the top-level step "
"command inside a Kubernetes pod with the given options. Typically you do not call "
Expand Down
7 changes: 7 additions & 0 deletions metaflow/plugins/kubernetes/kubernetes_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import random
import time

from metaflow.tracing import inject_tracing_vars


from metaflow.exception import MetaflowException
from metaflow.metaflow_config import KUBERNETES_SECRETS

Expand Down Expand Up @@ -135,6 +138,10 @@ def create(self):
"METAFLOW_KUBERNETES_SERVICE_ACCOUNT_NAME": "spec.serviceAccountName",
"METAFLOW_KUBERNETES_NODE_IP": "status.hostIP",
}.items()
]
+ [
client.V1EnvVar(name=k, value=str(v))
for k, v in inject_tracing_vars({}).items()
Copy link
Collaborator

@saikonen saikonen Aug 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to lead to unnecessary import errors in the task logs

No module named 'opentelemetry'

when running with Kubernetes

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only happens when trying to enable telemetry, but not including required libraries with either conda / a custom docker image. I think this is a good notification as is so no need to change things.

],
env_from=[
client.V1EnvFromSource(
Expand Down
22 changes: 13 additions & 9 deletions metaflow/plugins/metadata/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def __init__(self, environment, flow, event_logger, monitor):
def compute_info(cls, val):
v = val.rstrip("/")
try:
resp = requests.get(os.path.join(v, "ping"), headers=SERVICE_HEADERS)
resp = requests.get(os.path.join(v, "ping"), headers=SERVICE_HEADERS.copy())
saikonen marked this conversation as resolved.
Show resolved Hide resolved
resp.raise_for_status()
except: # noqa E722
raise ValueError("Metaflow service [%s] unreachable." % v)
Expand Down Expand Up @@ -413,25 +413,29 @@ def _request(
if method == "GET":
if monitor:
with monitor.measure("metaflow.service_metadata.get"):
resp = requests.get(url, headers=SERVICE_HEADERS)
resp = requests.get(url, headers=SERVICE_HEADERS.copy())
else:
resp = requests.get(url, headers=SERVICE_HEADERS)
resp = requests.get(url, headers=SERVICE_HEADERS.copy())
elif method == "POST":
if monitor:
with monitor.measure("metaflow.service_metadata.post"):
resp = requests.post(
url, headers=SERVICE_HEADERS, json=data
url, headers=SERVICE_HEADERS.copy(), json=data
)
else:
resp = requests.post(url, headers=SERVICE_HEADERS, json=data)
resp = requests.post(
url, headers=SERVICE_HEADERS.copy(), json=data
)
elif method == "PATCH":
if monitor:
with monitor.measure("metaflow.service_metadata.patch"):
resp = requests.patch(
url, headers=SERVICE_HEADERS, json=data
url, headers=SERVICE_HEADERS.copy(), json=data
)
else:
resp = requests.patch(url, headers=SERVICE_HEADERS, json=data)
resp = requests.patch(
url, headers=SERVICE_HEADERS.copy(), json=data
)
else:
raise MetaflowInternalError("Unexpected HTTP method %s" % (method,))
except MetaflowInternalError:
Expand Down Expand Up @@ -496,9 +500,9 @@ def _version(cls, monitor):
try:
if monitor:
with monitor.measure("metaflow.service_metadata.get"):
resp = requests.get(url, headers=SERVICE_HEADERS)
resp = requests.get(url, headers=SERVICE_HEADERS.copy())
else:
resp = requests.get(url, headers=SERVICE_HEADERS)
resp = requests.get(url, headers=SERVICE_HEADERS.copy())
except:
if monitor:
with monitor.count("metaflow.service_metadata.failed_request"):
Expand Down
6 changes: 5 additions & 1 deletion metaflow/plugins/pypi/conda_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,11 @@ def bootstrap_commands(self, step_name, datastore_type):
if id_:
return [
"echo 'Bootstrapping virtual environment...'",
'python -m metaflow.plugins.pypi.bootstrap "%s" %s "%s" linux-64'
# We have to prevent the tracing module from loading,
# as the bootstrapping process uses the internal S3 client which would fail to import tracing
# due to the required dependencies being bundled into the conda environment,
# which is yet to be initialized at this point.
'DISABLE_TRACING=True python -m metaflow.plugins.pypi.bootstrap "%s" %s "%s" linux-64'
% (self.flow.name, id_, self.datastore_type),
"echo 'Environment bootstrapped.'",
"export PATH=$PATH:$(pwd)/micromamba",
Expand Down
2 changes: 2 additions & 0 deletions metaflow/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
UBF_CONTROL,
UBF_TASK,
)
import metaflow.tracing as tracing

MAX_WORKERS = 16
MAX_NUM_SPLITS = 100
Expand Down Expand Up @@ -1235,6 +1236,7 @@ def _launch(self):
)
env.update(args.get_env())
env["PYTHONUNBUFFERED"] = "x"
tracing.inject_tracing_vars(env)
# NOTE bufsize=1 below enables line buffering which is required
# by read_logline() below that relies on readline() not blocking
# print('running', args)
Expand Down
5 changes: 4 additions & 1 deletion metaflow/sidecar/sidecar_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from .sidecar_messages import Message, MessageTypes
from ..debug import debug
from metaflow.tracing import inject_tracing_vars

MUST_SEND_RETRY_TIMES = 4
MESSAGE_WRITE_TIMEOUT_IN_MS = 1000
Expand Down Expand Up @@ -67,7 +68,6 @@ def __init__(self, worker_type):
self.start()

def start(self):

if (
self._worker_type is not None
and self._worker_type.startswith(NULL_SIDECAR_PREFIX)
Expand Down Expand Up @@ -127,11 +127,14 @@ def send(self, msg, retries=3):
def _start_subprocess(self, cmdline):
for _ in range(3):
try:
env = os.environ.copy()
inject_tracing_vars(env)
# Set stdout=sys.stdout & stderr=sys.stderr
# to print to console the output of sidecars.
return subprocess.Popen(
cmdline,
stdin=subprocess.PIPE,
env=env,
stdout=sys.stdout if debug.sidecar else subprocess.DEVNULL,
stderr=sys.stderr if debug.sidecar else subprocess.DEVNULL,
bufsize=0,
Expand Down
Loading