Skip to content

Commit

Permalink
Capture stack traces from Argo errors (Netflix#1941)
Browse files Browse the repository at this point in the history
* add option in argo workflows to capture the 1st error

* edits

* bug fix

---------

Co-authored-by: Saurabh Garg <[email protected]>
  • Loading branch information
savingoyal and iamsgarg-ob authored Aug 5, 2024
1 parent a7c7882 commit c0357f4
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 19 deletions.
5 changes: 5 additions & 0 deletions metaflow/metaflow_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,11 @@
###
UI_URL = from_conf("UI_URL")

###
# Capture error logs from argo
###
ARGO_WORKFLOWS_CAPTURE_ERROR_SCRIPT = from_conf("ARGO_WORKFLOWS_CAPTURE_ERROR_SCRIPT")

# Contact information displayed when running the `metaflow` command.
# Value should be a dictionary where:
# - key is a string describing contact method
Expand Down
192 changes: 179 additions & 13 deletions metaflow/plugins/argo/argo_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
import re
import shlex
import sys
from typing import Tuple, List
from collections import defaultdict
from hashlib import sha1
from math import inf
from typing import List, Tuple

from metaflow import JSONType, current
from metaflow.graph import DAGNode
from metaflow.decorators import flow_decorators
from metaflow.exception import MetaflowException
from metaflow.graph import DAGNode, FlowGraph
from metaflow.includefile import FilePathClass
from metaflow.metaflow_config import (
ARGO_EVENTS_EVENT,
Expand All @@ -21,10 +21,12 @@
ARGO_EVENTS_INTERNAL_WEBHOOK_URL,
ARGO_EVENTS_SERVICE_ACCOUNT,
ARGO_EVENTS_WEBHOOK_AUTH,
ARGO_WORKFLOWS_CAPTURE_ERROR_SCRIPT,
ARGO_WORKFLOWS_ENV_VARS_TO_SKIP,
ARGO_WORKFLOWS_KUBERNETES_SECRETS,
ARGO_WORKFLOWS_UI_URL,
AWS_SECRETS_MANAGER_DEFAULT_REGION,
AZURE_KEY_VAULT_PREFIX,
AZURE_STORAGE_BLOB_SERVICE_ENDPOINT,
CARD_AZUREROOT,
CARD_GSROOT,
Expand All @@ -36,7 +38,6 @@
DEFAULT_METADATA,
DEFAULT_SECRETS_BACKEND_TYPE,
GCP_SECRET_MANAGER_PREFIX,
AZURE_KEY_VAULT_PREFIX,
KUBERNETES_FETCH_EC2_METADATA,
KUBERNETES_LABELS,
KUBERNETES_NAMESPACE,
Expand All @@ -49,25 +50,22 @@
SERVICE_INTERNAL_URL,
UI_URL,
)
from metaflow.unbounded_foreach import UBF_CONTROL, UBF_TASK
from metaflow.metaflow_config_funcs import config_values
from metaflow.mflog import BASH_SAVE_LOGS, bash_capture_logs, export_mflog_env_vars
from metaflow.parameters import deploy_time_eval
from metaflow.plugins.kubernetes.kubernetes import (
parse_kube_keyvalue_list,
validate_kube_labels,
)
from metaflow.graph import FlowGraph
from metaflow.plugins.kubernetes.kubernetes_jobsets import KubernetesArgoJobSet
from metaflow.unbounded_foreach import UBF_CONTROL, UBF_TASK
from metaflow.util import (
compress_list,
dict_to_cli_options,
to_bytes,
to_camelcase,
to_unicode,
)
from metaflow.plugins.kubernetes.kubernetes_jobsets import (
KubernetesArgoJobSet,
)

from .argo_client import ArgoClient

Expand Down Expand Up @@ -118,6 +116,7 @@ def __init__(
notify_slack_webhook_url=None,
notify_pager_duty_integration_key=None,
enable_heartbeat_daemon=True,
enable_error_msg_capture=False,
):
# Some high-level notes -
#
Expand Down Expand Up @@ -166,7 +165,7 @@ def __init__(
self.notify_slack_webhook_url = notify_slack_webhook_url
self.notify_pager_duty_integration_key = notify_pager_duty_integration_key
self.enable_heartbeat_daemon = enable_heartbeat_daemon

self.enable_error_msg_capture = enable_error_msg_capture
self.parameters = self._process_parameters()
self.triggers, self.trigger_options = self._process_triggers()
self._schedule, self._timezone = self._get_schedule()
Expand Down Expand Up @@ -786,6 +785,12 @@ def _compile_workflow_template(self):
)
# Set the entrypoint to flow name
.entrypoint(self.flow.name)
# OnExit hooks
.onExit(
"capture-error-hook-fn-preflight"
if self.enable_error_msg_capture
else None
)
# Set exit hook handlers if notifications are enabled
.hooks(
{
Expand Down Expand Up @@ -1063,7 +1068,7 @@ def _visit(
"%s-foreach-%s"
% (
node.name,
"parallel" if node.parallel_foreach else node.foreach_param
"parallel" if node.parallel_foreach else node.foreach_param,
# Since foreach's are derived based on `self.next(self.a, foreach="<varname>")`
# vs @parallel foreach are done based on `self.next(self.a, num_parallel="<some-number>")`,
# we need to ensure that `foreach_template_name` suffix is appropriately set based on the kind
Expand Down Expand Up @@ -1849,9 +1854,11 @@ def _container_templates(self):
list(
[]
if not resources.get("secrets")
else [resources.get("secrets")]
if isinstance(resources.get("secrets"), str)
else resources.get("secrets")
else (
[resources.get("secrets")]
if isinstance(resources.get("secrets"), str)
else resources.get("secrets")
)
)
+ KUBERNETES_SECRETS.split(",")
+ ARGO_WORKFLOWS_KUBERNETES_SECRETS.split(",")
Expand Down Expand Up @@ -2162,8 +2169,122 @@ def _exit_hook_templates(self):
.success_condition("true == true")
)
)
if self.enable_error_msg_capture:
templates.extend(self._error_msg_capture_hook_templates())
return templates

def _error_msg_capture_hook_templates(self):
from kubernetes import client as kubernetes_sdk

start_step = [step for step in self.graph if step.name == "start"][0]
# We want to grab the base image used by the start step, as this is known to be pullable from within the cluster,
# and it might contain the required libraries, allowing us to start up faster.
resources = dict(
[deco for deco in start_step.decorators if deco.name == "kubernetes"][
0
].attributes
)

run_id_template = "argo-{{workflow.name}}"
metaflow_version = self.environment.get_environment_info()
metaflow_version["flow_name"] = self.graph.name
metaflow_version["production_token"] = self.production_token

mflog_expr = export_mflog_env_vars(
datastore_type=self.flow_datastore.TYPE,
stdout_path="$PWD/.logs/mflog_stdout",
stderr_path="$PWD/.logs/mflog_stderr",
flow_name=self.flow.name,
run_id=run_id_template,
step_name="_run_capture_error",
task_id="1",
retry_count="0",
)

cmds = " && ".join(
[
# For supporting sandboxes, ensure that a custom script is executed
# before anything else is executed. The script is passed in as an
# env var.
'${METAFLOW_INIT_SCRIPT:+eval \\"${METAFLOW_INIT_SCRIPT}\\"}',
"mkdir -p $PWD/.logs",
mflog_expr,
]
+ self.environment.get_package_commands(
self.code_package_url, self.flow_datastore.TYPE
)[:-1]
# Replace the line 'Task in starting'
# FIXME: this can be brittle.
+ ["mflog 'Error capture hook is starting.'"]
+ ["python -m metaflow.plugins.argo.capture_error"]
+ [
'if [ -n \\"${ARGO_WORKFLOWS_CAPTURE_ERROR_SCRIPT}\\" ]; then eval \\"${ARGO_WORKFLOWS_CAPTURE_ERROR_SCRIPT}\\"; fi'
]
)

# TODO: Also capture the first failed task id
cmds = shlex.split('bash -c "%s"' % cmds)
env = {
# These values are needed by Metaflow to set it's internal
# state appropriately.
"METAFLOW_CODE_URL": self.code_package_url,
"METAFLOW_CODE_SHA": self.code_package_sha,
"METAFLOW_CODE_DS": self.flow_datastore.TYPE,
"METAFLOW_SERVICE_URL": SERVICE_INTERNAL_URL,
"METAFLOW_SERVICE_HEADERS": json.dumps(SERVICE_HEADERS),
"METAFLOW_USER": "argo-workflows",
"METAFLOW_DEFAULT_DATASTORE": self.flow_datastore.TYPE,
"METAFLOW_DEFAULT_METADATA": DEFAULT_METADATA,
"METAFLOW_OWNER": self.username,
}
# support Metaflow sandboxes
env["METAFLOW_INIT_SCRIPT"] = KUBERNETES_SANDBOX_INIT_SCRIPT
env["ARGO_WORKFLOWS_CAPTURE_ERROR_SCRIPT"] = ARGO_WORKFLOWS_CAPTURE_ERROR_SCRIPT

env["METAFLOW_ARGO_WORKFLOW_FAILURES"] = "{{workflow.failures}}"
env = {
k: v
for k, v in env.items()
if v is not None
and k not in set(ARGO_WORKFLOWS_ENV_VARS_TO_SKIP.split(","))
}
return [
Template("error-msg-capture-hook").container(
to_camelcase(
kubernetes_sdk.V1Container(
name="main",
command=cmds,
image=resources["image"],
env=[
kubernetes_sdk.V1EnvVar(name=k, value=str(v))
for k, v in env.items()
],
resources=kubernetes_sdk.V1ResourceRequirements(
# NOTE: base resources for this are kept to a minimum to save on running costs.
# This has an adverse effect on startup time for the daemon, which can be completely
# alleviated by using a base image that has the required dependencies pre-installed
requests={
"cpu": "200m",
"memory": "100Mi",
},
limits={
"cpu": "200m",
"memory": "100Mi",
},
),
)
)
),
Template("capture-error-hook-fn-preflight").steps(
[
WorkflowStep()
.name("capture-error-hook-fn-preflight")
.template("error-msg-capture-hook")
.when("{{workflow.status}} != Succeeded")
]
),
]

def _pager_duty_alert_template(self):
# https://developer.pagerduty.com/docs/ZG9jOjExMDI5NTgx-send-an-alert-event
if self.notify_pager_duty_integration_key is None:
Expand Down Expand Up @@ -2907,6 +3028,34 @@ def __str__(self):
return json.dumps(self.to_json(), indent=4)


class WorkflowStep(object):
def __init__(self):
tree = lambda: defaultdict(tree)
self.payload = tree()

def name(self, name):
self.payload["name"] = str(name)
return self

def template(self, template):
self.payload["template"] = str(template)
return self

def when(self, condition):
self.payload["when"] = str(condition)
return self

def step(self, expression):
self.payload["expression"] = str(expression)
return self

def to_json(self):
return self.payload

def __str__(self):
return json.dumps(self.to_json(), indent=4)


class WorkflowSpec(object):
# https://argoproj.github.io/argo-workflows/fields/#workflowspec
# This object sets all Workflow level properties.
Expand Down Expand Up @@ -2937,6 +3086,11 @@ def entrypoint(self, entrypoint):
self.payload["entrypoint"] = entrypoint
return self

def onExit(self, on_exit_template):
if on_exit_template:
self.payload["onExit"] = on_exit_template
return self

def parallelism(self, parallelism):
# Set parallelism at Workflow level
self.payload["parallelism"] = int(parallelism)
Expand Down Expand Up @@ -3062,6 +3216,18 @@ def dag(self, dag_template):
self.payload["dag"] = dag_template.to_json()
return self

def steps(self, steps):
if "steps" not in self.payload:
self.payload["steps"] = []
# steps is a list of lists.
# hence we go over every item in the incoming list
# serialize it and then append the list to the payload
step_list = []
for step in steps:
step_list.append(step.to_json())
self.payload["steps"].append(step_list)
return self

def container(self, container):
# Luckily this can simply be V1Container and we are spared from writing more
# boilerplate - https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1Container.md.
Expand Down
21 changes: 17 additions & 4 deletions metaflow/plugins/argo/argo_workflows_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
import sys
from hashlib import sha1

from metaflow import Run, JSONType, current, decorators, parameters
from metaflow.client.core import get_metadata
from metaflow.exception import MetaflowNotFound
from metaflow import JSONType, Run, current, decorators, parameters
from metaflow._vendor import click
from metaflow.exception import MetaflowException, MetaflowInternalError
from metaflow.client.core import get_metadata
from metaflow.exception import (
MetaflowException,
MetaflowInternalError,
MetaflowNotFound,
)
from metaflow.metaflow_config import (
ARGO_WORKFLOWS_UI_URL,
KUBERNETES_NAMESPACE,
Expand Down Expand Up @@ -181,6 +184,12 @@ def argo_workflows(obj, name=None):
help="Write the workflow name to the file specified. Used internally for Metaflow's Deployer API.",
hidden=True,
)
@click.option(
"--enable-error-msg-capture/--no-enable-error-msg-capture",
default=False,
show_default=True,
help="Capture stack trace of first failed task in exit hook.",
)
@click.pass_obj
def create(
obj,
Expand All @@ -200,6 +209,7 @@ def create(
notify_pager_duty_integration_key=None,
enable_heartbeat_daemon=True,
deployer_attribute_file=None,
enable_error_msg_capture=False,
):
validate_tags(tags)

Expand Down Expand Up @@ -248,6 +258,7 @@ def create(
notify_slack_webhook_url,
notify_pager_duty_integration_key,
enable_heartbeat_daemon,
enable_error_msg_capture,
)

if only_json:
Expand Down Expand Up @@ -421,6 +432,7 @@ def make_flow(
notify_slack_webhook_url,
notify_pager_duty_integration_key,
enable_heartbeat_daemon,
enable_error_msg_capture,
):
# TODO: Make this check less specific to Amazon S3 as we introduce
# support for more cloud object stores.
Expand Down Expand Up @@ -484,6 +496,7 @@ def make_flow(
notify_slack_webhook_url=notify_slack_webhook_url,
notify_pager_duty_integration_key=notify_pager_duty_integration_key,
enable_heartbeat_daemon=enable_heartbeat_daemon,
enable_error_msg_capture=enable_error_msg_capture,
)


Expand Down
Loading

0 comments on commit c0357f4

Please sign in to comment.