diff --git a/devtools/Tiltfile b/devtools/Tiltfile index ac6ba29597b..620671b0137 100644 --- a/devtools/Tiltfile +++ b/devtools/Tiltfile @@ -224,6 +224,19 @@ if "argo-workflows" in enabled_components: ] ) + # This fixes issue described in: https://github.com/argoproj/argo-workflows/issues/10340 + k8s_yaml(encode_yaml({ + 'apiVersion': 'v1', + 'kind': 'Secret', + 'metadata': { + 'name': 'default.service-account-token', + 'annotations': { + 'kubernetes.io/service-account.name': 'default' + } + }, + 'type': 'kubernetes.io/service-account-token' + })) + k8s_yaml(encode_yaml({ 'apiVersion': 'rbac.authorization.k8s.io/v1', 'kind': 'Role', @@ -231,11 +244,23 @@ if "argo-workflows" in enabled_components: 'name': 'argo-workflowtaskresults-role', 'namespace': 'default' }, - 'rules': [{ + 'rules': [ + { 'apiGroups': ['argoproj.io'], 'resources': ['workflowtaskresults'], 'verbs': ['create', 'patch', 'get', 'list'] - }] + }, + { + 'apiGroups': ['argoproj.io'], + 'resources': ['workflowtasksets'], + 'verbs': ['watch', 'list'] + }, + { + 'apiGroups': ['argoproj.io'], + 'resources': ['workflowtasksets/status'], + 'verbs': ['patch'] + }, + ] })) k8s_yaml(encode_yaml({ diff --git a/metaflow/plugins/__init__.py b/metaflow/plugins/__init__.py index f74a70726ba..5f695cc2890 100644 --- a/metaflow/plugins/__init__.py +++ b/metaflow/plugins/__init__.py @@ -69,6 +69,7 @@ ("trigger_on_finish", ".events_decorator.TriggerOnFinishDecorator"), ("pypi_base", ".pypi.pypi_decorator.PyPIFlowDecorator"), ("conda_base", ".pypi.conda_decorator.CondaFlowDecorator"), + ("exit_hook", ".argo.exit_hook_decorator.ExitHookDecorator"), ] # Add environments here diff --git a/metaflow/plugins/argo/argo_workflows.py b/metaflow/plugins/argo/argo_workflows.py index af4a510cb5b..83281a44290 100644 --- a/metaflow/plugins/argo/argo_workflows.py +++ b/metaflow/plugins/argo/argo_workflows.py @@ -65,6 +65,7 @@ ) from .argo_client import ArgoClient +from .exit_hooks import ExitHookHack, HttpExitHook, ContainerHook from metaflow.util import resolve_identity @@ -795,6 +796,7 @@ def _compile_workflow_template(self): dag_annotation = {"metaflow/dag": json.dumps(graph_info)} + lifecycle_hooks = self._lifecycle_hooks() return ( WorkflowTemplate() .metadata( @@ -903,97 +905,20 @@ def _compile_workflow_template(self): if self.enable_error_msg_capture else None ) - # Set exit hook handlers if notifications are enabled + # Set lifecycle hooks if notifications are enabled .hooks( { - **( - { - # workflow status maps to Completed - "notify-slack-on-success": LifecycleHook() - .expression("workflow.status == 'Succeeded'") - .template("notify-slack-on-success"), - } - if self.notify_on_success and self.notify_slack_webhook_url - else {} - ), - **( - { - # workflow status maps to Completed - "notify-pager-duty-on-success": LifecycleHook() - .expression("workflow.status == 'Succeeded'") - .template("notify-pager-duty-on-success"), - } - if self.notify_on_success - and self.notify_pager_duty_integration_key - else {} - ), - **( - { - # workflow status maps to Completed - "notify-incident-io-on-success": LifecycleHook() - .expression("workflow.status == 'Succeeded'") - .template("notify-incident-io-on-success"), - } - if self.notify_on_success - and self.notify_incident_io_api_key - else {} - ), - **( - { - # workflow status maps to Failed or Error - "notify-slack-on-failure": LifecycleHook() - .expression("workflow.status == 'Failed'") - .template("notify-slack-on-error"), - "notify-slack-on-error": LifecycleHook() - .expression("workflow.status == 'Error'") - .template("notify-slack-on-error"), - } - if self.notify_on_error and self.notify_slack_webhook_url - else {} - ), - **( - { - # workflow status maps to Failed or Error - "notify-pager-duty-on-failure": LifecycleHook() - .expression("workflow.status == 'Failed'") - .template("notify-pager-duty-on-error"), - "notify-pager-duty-on-error": LifecycleHook() - .expression("workflow.status == 'Error'") - .template("notify-pager-duty-on-error"), - } - if self.notify_on_error - and self.notify_pager_duty_integration_key - else {} - ), - **( - { - # workflow status maps to Failed or Error - "notify-incident-io-on-failure": LifecycleHook() - .expression("workflow.status == 'Failed'") - .template("notify-incident-io-on-error"), - "notify-incident-io-on-error": LifecycleHook() - .expression("workflow.status == 'Error'") - .template("notify-incident-io-on-error"), - } - if self.notify_on_error and self.notify_incident_io_api_key - else {} - ), - # Warning: terrible hack to workaround a bug in Argo Workflow - # where the hooks listed above do not execute unless - # there is an explicit exit hook. as and when this - # bug is patched, we should remove this effectively - # no-op hook. - **( - {"exit": LifecycleHook().template("exit-hook-hack")} - if self.notify_on_error or self.notify_on_success - else {} - ), + lifecycle.name: lifecycle + for hook in lifecycle_hooks + for lifecycle in hook.lifecycle_hooks } ) # Top-level DAG template(s) .templates(self._dag_templates()) # Container templates .templates(self._container_templates()) + # Lifecycle hook template(s) + .templates([hook.template for hook in lifecycle_hooks]) # Exit hook template(s) .templates(self._exit_hook_templates()) # Sidecar templates (Daemon Containers) @@ -2333,40 +2258,183 @@ def _daemon_templates(self): templates.append(self._heartbeat_daemon_template()) return templates - # Return exit hook templates for workflow execution notifications. - def _exit_hook_templates(self): - templates = [] + # Return lifecycle hooks for workflow execution notifications. + def _lifecycle_hooks(self): + hooks = [] if self.notify_on_error: - templates.append(self._slack_error_template()) - templates.append(self._pager_duty_alert_template()) - templates.append(self._incident_io_alert_template()) + hooks.append(self._slack_error_template()) + hooks.append(self._pager_duty_alert_template()) + hooks.append(self._incident_io_alert_template()) if self.notify_on_success: - templates.append(self._slack_success_template()) - templates.append(self._pager_duty_change_template()) - templates.append(self._incident_io_change_template()) + hooks.append(self._slack_success_template()) + hooks.append(self._pager_duty_change_template()) + hooks.append(self._incident_io_change_template()) + + exit_hook_decos = self.flow._flow_decorators.get("exit_hook", []) + + for deco in exit_hook_decos: + hooks.extend(self._lifecycle_hook_from_deco(deco)) # Clean up None values from templates. - templates = list(filter(None, templates)) - - if self.notify_on_error or self.notify_on_success: - # Warning: terrible hack to workaround a bug in Argo Workflow where the - # templates listed above do not execute unless there is an - # explicit exit hook. as and when this bug is patched, we should - # remove this effectively no-op template. - # Note: We use the Http template because changing this to an actual no-op container had the side-effect of - # leaving LifecycleHooks in a pending state even when they have finished execution. - templates.append( - Template("exit-hook-hack").http( - Http("GET") - .url( + hooks = list(filter(None, hooks)) + + if hooks: + hooks.append( + ExitHookHack( + url=( self.notify_slack_webhook_url or "https://events.pagerduty.com/v2/enqueue" ) - .success_condition("true == true") ) ) + return hooks + + def _lifecycle_hook_from_deco(self, deco): + from kubernetes import client as kubernetes_sdk + + start_step = [step for step in self.graph if step.name == "start"][0] + # We want to grab the base image used by the start step, as this is known to be pullable from within the cluster, + # and it might contain the required libraries, allowing us to start up faster. + resources = dict( + [deco for deco in start_step.decorators if deco.name == "kubernetes"][ + 0 + ].attributes + ) + + run_id_template = "argo-{{workflow.name}}" + metaflow_version = self.environment.get_environment_info() + script_import = metaflow_version["script"].rstrip(".py") + metaflow_version["flow_name"] = self.graph.name + metaflow_version["production_token"] = self.production_token + env = { + # These values are needed by Metaflow to set it's internal + # state appropriately. + "METAFLOW_CODE_URL": self.code_package_url, + "METAFLOW_CODE_SHA": self.code_package_sha, + "METAFLOW_CODE_DS": self.flow_datastore.TYPE, + "METAFLOW_SERVICE_URL": SERVICE_INTERNAL_URL, + "METAFLOW_SERVICE_HEADERS": json.dumps(SERVICE_HEADERS), + "METAFLOW_USER": "argo-workflows", + "METAFLOW_DEFAULT_DATASTORE": self.flow_datastore.TYPE, + "METAFLOW_DEFAULT_METADATA": DEFAULT_METADATA, + "METAFLOW_OWNER": self.username, + } + # support Metaflow sandboxes + env["METAFLOW_INIT_SCRIPT"] = KUBERNETES_SANDBOX_INIT_SCRIPT + + env["METAFLOW_WORKFLOW_NAME"] = "{{workflow.name}}" + env["METAFLOW_WORKFLOW_NAMESPACE"] = "{{workflow.namespace}}" + env = { + k: v + for k, v in env.items() + if v is not None + and k not in set(ARGO_WORKFLOWS_ENV_VARS_TO_SKIP.split(",")) + } + + def _cmd(fn_name): + mflog_expr = export_mflog_env_vars( + datastore_type=self.flow_datastore.TYPE, + stdout_path="$PWD/.logs/mflog_stdout", + stderr_path="$PWD/.logs/mflog_stderr", + flow_name=self.flow.name, + run_id=run_id_template, + step_name=f"_hook_{fn_name}", + task_id="1", + retry_count="0", + ) + cmds = " && ".join( + [ + # For supporting sandboxes, ensure that a custom script is executed + # before anything else is executed. The script is passed in as an + # env var. + '${METAFLOW_INIT_SCRIPT:+eval \\"${METAFLOW_INIT_SCRIPT}\\"}', + "mkdir -p $PWD/.logs", + mflog_expr, + ] + + self.environment.get_package_commands( + self.code_package_url, self.flow_datastore.TYPE + )[:-1] + # Replace the line 'Task in starting' + + [f"mflog 'Lifecycle hook {fn_name} is starting.'"] + + [ + f"python -c 'import {script_import} as tempflow; tempflow.{fn_name}();'" + ] + ) + + cmds = shlex.split('bash -c "%s"' % cmds) + return cmds + + def _container(cmds): + return to_camelcase( + kubernetes_sdk.V1Container( + name="main", + command=cmds, + image=deco.attributes["image"] or resources["image"], + env=[ + kubernetes_sdk.V1EnvVar(name=k, value=str(v)) + for k, v in env.items() + ], + env_from=[ + kubernetes_sdk.V1EnvFromSource( + secret_ref=kubernetes_sdk.V1SecretEnvSource( + name=str(k), + # optional=True + ) + ) + for k in list( + [] + if not resources.get("secrets") + else ( + [resources.get("secrets")] + if isinstance(resources.get("secrets"), str) + else resources.get("secrets") + ) + ) + + KUBERNETES_SECRETS.split(",") + + ARGO_WORKFLOWS_KUBERNETES_SECRETS.split(",") + if k + ], + resources=kubernetes_sdk.V1ResourceRequirements( + # NOTE: base resources for this are kept to a minimum to save on running costs. + requests={ + "cpu": "200m", + "memory": "100Mi", + }, + limits={ + "cpu": "200m", + "memory": "500Mi", + }, + ), + ).to_dict() + ) + + # create lifecycle hooks from deco + hooks = [] + for success_fn_name in deco.success_hooks: + hook = ContainerHook( + name=f"success-{success_fn_name.replace('_', '-')}", + container=_container(cmds=_cmd(success_fn_name)), + service_account_name=resources["service_account"], + on_success=True, + ) + hooks.append(hook) + + for error_fn_name in deco.error_hooks: + hook = ContainerHook( + name=f"error-{error_fn_name.replace('_', '-')}", + service_account_name=resources["service_account"], + container=_container(cmds=_cmd(error_fn_name)), + on_error=True, + ) + hooks.append(hook) + + return hooks + + def _exit_hook_templates(self): + templates = [] if self.enable_error_msg_capture: templates.extend(self._error_msg_capture_hook_templates()) + return templates def _error_msg_capture_hook_templates(self): @@ -2515,30 +2583,30 @@ def _pager_duty_alert_template(self): # https://developer.pagerduty.com/docs/ZG9jOjExMDI5NTgx-send-an-alert-event if self.notify_pager_duty_integration_key is None: return None - return Template("notify-pager-duty-on-error").http( - Http("POST") - .url("https://events.pagerduty.com/v2/enqueue") - .header("Content-Type", "application/json") - .body( - json.dumps( - { - "event_action": "trigger", - "routing_key": self.notify_pager_duty_integration_key, - # "dedup_key": self.flow.name, # TODO: Do we need deduplication? - "payload": { - "source": "{{workflow.name}}", - "severity": "info", - "summary": "Metaflow run %s/argo-{{workflow.name}} failed!" - % self.flow.name, - "custom_details": { - "Flow": self.flow.name, - "Run ID": "argo-{{workflow.name}}", - }, + return HttpExitHook( + name="notify-pager-duty-on-error", + method="POST", + url="https://events.pagerduty.com/v2/enqueue", + headers={"Content-Type": "application/json"}, + body=json.dumps( + { + "event_action": "trigger", + "routing_key": self.notify_pager_duty_integration_key, + # "dedup_key": self.flow.name, # TODO: Do we need deduplication? + "payload": { + "source": "{{workflow.name}}", + "severity": "info", + "summary": "Metaflow run %s/argo-{{workflow.name}} failed!" + % self.flow.name, + "custom_details": { + "Flow": self.flow.name, + "Run ID": "argo-{{workflow.name}}", }, - "links": self._pager_duty_notification_links(), - } - ) - ) + }, + "links": self._pager_duty_notification_links(), + } + ), + on_error=True, ) def _incident_io_alert_template(self): @@ -2549,50 +2617,52 @@ def _incident_io_alert_template(self): "Creating alerts for errors requires a alert source config ID." ) ui_links = self._incident_io_ui_urls_for_run() - return Template("notify-incident-io-on-error").http( - Http("POST") - .url( + return HttpExitHook( + name="notify-incident-io-on-error", + method="POST", + url=( "https://api.incident.io/v2/alert_events/http/%s" % self.incident_io_alert_source_config_id - ) - .header("Content-Type", "application/json") - .header("Authorization", "Bearer %s" % self.notify_incident_io_api_key) - .body( - json.dumps( - { - "idempotency_key": "argo-{{workflow.name}}", # use run id to deduplicate alerts. - "status": "firing", - "title": "Flow %s has failed." % self.flow.name, - "description": "Metaflow run {run_pathspec} failed!{urls}".format( - run_pathspec="%s/argo-{{workflow.name}}" % self.flow.name, - urls=( - "\n\nSee details for the run at:\n\n" - + "\n\n".join(ui_links) - if ui_links - else "" - ), - ), - "source_url": ( - "%s/%s/%s" - % ( - UI_URL.rstrip("/"), - self.flow.name, - "argo-{{workflow.name}}", - ) - if UI_URL - else None + ), + headers={ + "Content-Type": "application/json", + "Authorization": "Bearer %s" % self.notify_incident_io_api_key, + }, + body=json.dumps( + { + "idempotency_key": "argo-{{workflow.name}}", # use run id to deduplicate alerts. + "status": "firing", + "title": "Flow %s has failed." % self.flow.name, + "description": "Metaflow run {run_pathspec} failed!{urls}".format( + run_pathspec="%s/argo-{{workflow.name}}" % self.flow.name, + urls=( + "\n\nSee details for the run at:\n\n" + + "\n\n".join(ui_links) + if ui_links + else "" ), - "metadata": { - **(self.incident_io_metadata or {}), - **{ - "run_status": "failed", - "flow_name": self.flow.name, - "run_id": "argo-{{workflow.name}}", - }, + ), + "source_url": ( + "%s/%s/%s" + % ( + UI_URL.rstrip("/"), + self.flow.name, + "argo-{{workflow.name}}", + ) + if UI_URL + else None + ), + "metadata": { + **(self.incident_io_metadata or {}), + **{ + "run_status": "failed", + "flow_name": self.flow.name, + "run_id": "argo-{{workflow.name}}", }, - } - ) - ) + }, + } + ), + on_error=True, ) def _incident_io_change_template(self): @@ -2603,50 +2673,52 @@ def _incident_io_change_template(self): "Creating alerts for successes requires an alert source config ID." ) ui_links = self._incident_io_ui_urls_for_run() - return Template("notify-incident-io-on-success").http( - Http("POST") - .url( + return HttpExitHook( + name="notify-incident-io-on-success", + method="POST", + url=( "https://api.incident.io/v2/alert_events/http/%s" % self.incident_io_alert_source_config_id - ) - .header("Content-Type", "application/json") - .header("Authorization", "Bearer %s" % self.notify_incident_io_api_key) - .body( - json.dumps( - { - "idempotency_key": "argo-{{workflow.name}}", # use run id to deduplicate alerts. - "status": "firing", - "title": "Flow %s has succeeded." % self.flow.name, - "description": "Metaflow run {run_pathspec} succeeded!{urls}".format( - run_pathspec="%s/argo-{{workflow.name}}" % self.flow.name, - urls=( - "\n\nSee details for the run at:\n\n" - + "\n\n".join(ui_links) - if ui_links - else "" - ), - ), - "source_url": ( - "%s/%s/%s" - % ( - UI_URL.rstrip("/"), - self.flow.name, - "argo-{{workflow.name}}", - ) - if UI_URL - else None + ), + headers={ + "Content-Type": "application/json", + "Authorization": "Bearer %s" % self.notify_incident_io_api_key, + }, + body=json.dumps( + { + "idempotency_key": "argo-{{workflow.name}}", # use run id to deduplicate alerts. + "status": "firing", + "title": "Flow %s has succeeded." % self.flow.name, + "description": "Metaflow run {run_pathspec} succeeded!{urls}".format( + run_pathspec="%s/argo-{{workflow.name}}" % self.flow.name, + urls=( + "\n\nSee details for the run at:\n\n" + + "\n\n".join(ui_links) + if ui_links + else "" ), - "metadata": { - **(self.incident_io_metadata or {}), - **{ - "run_status": "succeeded", - "flow_name": self.flow.name, - "run_id": "argo-{{workflow.name}}", - }, + ), + "source_url": ( + "%s/%s/%s" + % ( + UI_URL.rstrip("/"), + self.flow.name, + "argo-{{workflow.name}}", + ) + if UI_URL + else None + ), + "metadata": { + **(self.incident_io_metadata or {}), + **{ + "run_status": "succeeded", + "flow_name": self.flow.name, + "run_id": "argo-{{workflow.name}}", }, - } - ) - ) + }, + } + ), + on_success=True, ) def _incident_io_ui_urls_for_run(self): @@ -2671,27 +2743,27 @@ def _pager_duty_change_template(self): # https://developer.pagerduty.com/docs/ZG9jOjExMDI5NTgy-send-a-change-event if self.notify_pager_duty_integration_key is None: return None - return Template("notify-pager-duty-on-success").http( - Http("POST") - .url("https://events.pagerduty.com/v2/change/enqueue") - .header("Content-Type", "application/json") - .body( - json.dumps( - { - "routing_key": self.notify_pager_duty_integration_key, - "payload": { - "summary": "Metaflow run %s/argo-{{workflow.name}} Succeeded" - % self.flow.name, - "source": "{{workflow.name}}", - "custom_details": { - "Flow": self.flow.name, - "Run ID": "argo-{{workflow.name}}", - }, + return HttpExitHook( + name="notify-pager-duty-on-success", + method="POST", + url="https://events.pagerduty.com/v2/change/enqueue", + headers={"Content-Type": "application/json"}, + body=json.dumps( + { + "routing_key": self.notify_pager_duty_integration_key, + "payload": { + "summary": "Metaflow run %s/argo-{{workflow.name}} Succeeded" + % self.flow.name, + "source": "{{workflow.name}}", + "custom_details": { + "Flow": self.flow.name, + "Run ID": "argo-{{workflow.name}}", }, - "links": self._pager_duty_notification_links(), - } - ) - ) + }, + "links": self._pager_duty_notification_links(), + } + ), + on_success=True, ) def _pager_duty_notification_links(self): @@ -2795,8 +2867,12 @@ def _slack_error_template(self): blocks = self._get_slack_blocks(message) payload = {"text": message, "blocks": blocks} - return Template("notify-slack-on-error").http( - Http("POST").url(self.notify_slack_webhook_url).body(json.dumps(payload)) + return HttpExitHook( + name="notify-slack-on-error", + method="POST", + url=self.notify_slack_webhook_url, + body=json.dumps(payload), + on_error=True, ) def _slack_success_template(self): @@ -2811,8 +2887,12 @@ def _slack_success_template(self): blocks = self._get_slack_blocks(message) payload = {"text": message, "blocks": blocks} - return Template("notify-slack-on-success").http( - Http("POST").url(self.notify_slack_webhook_url).body(json.dumps(payload)) + return HttpExitHook( + name="notify-slack-on-success", + method="POST", + url=self.notify_slack_webhook_url, + body=json.dumps(payload), + on_success=True, ) def _heartbeat_daemon_template(self): @@ -4179,57 +4259,3 @@ def to_json(self): def __str__(self): return json.dumps(self.payload, indent=4) - - -class Http(object): - # https://argoproj.github.io/argo-workflows/fields/#http - - def __init__(self, method): - tree = lambda: defaultdict(tree) - self.payload = tree() - self.payload["method"] = method - self.payload["headers"] = [] - - def header(self, header, value): - self.payload["headers"].append({"name": header, "value": value}) - return self - - def body(self, body): - self.payload["body"] = str(body) - return self - - def url(self, url): - self.payload["url"] = url - return self - - def success_condition(self, success_condition): - self.payload["successCondition"] = success_condition - return self - - def to_json(self): - return self.payload - - def __str__(self): - return json.dumps(self.payload, indent=4) - - -class LifecycleHook(object): - # https://argoproj.github.io/argo-workflows/fields/#lifecyclehook - - def __init__(self): - tree = lambda: defaultdict(tree) - self.payload = tree() - - def expression(self, expression): - self.payload["expression"] = str(expression) - return self - - def template(self, template): - self.payload["template"] = template - return self - - def to_json(self): - return self.payload - - def __str__(self): - return json.dumps(self.payload, indent=4) diff --git a/metaflow/plugins/argo/exit_hook_decorator.py b/metaflow/plugins/argo/exit_hook_decorator.py new file mode 100644 index 00000000000..71f909d79c0 --- /dev/null +++ b/metaflow/plugins/argo/exit_hook_decorator.py @@ -0,0 +1,32 @@ +from metaflow.decorators import FlowDecorator +from metaflow.exception import MetaflowException + + +class ExitHookDecorator(FlowDecorator): + name = "exit_hook" + allow_multiple = True + + defaults = { + "image": None, + "on_success": [], + "on_error": [], + } + + def flow_init( + self, flow, graph, environment, flow_datastore, metadata, logger, echo, options + ): + on_success = self.attributes["on_success"] + on_error = self.attributes["on_error"] + + if not on_success and not on_error: + raise MetaflowException( + "Choose at least one of the options on_success/on_error" + ) + + self.success_hooks = [] + self.error_hooks = [] + for success_fn in self.attributes["on_success"]: + self.success_hooks.append(success_fn.__name__) + + for error_fn in self.attributes["on_error"]: + self.error_hooks.append(error_fn.__name__) diff --git a/metaflow/plugins/argo/exit_hooks.py b/metaflow/plugins/argo/exit_hooks.py new file mode 100644 index 00000000000..7d0f9144f6e --- /dev/null +++ b/metaflow/plugins/argo/exit_hooks.py @@ -0,0 +1,209 @@ +from collections import defaultdict +import json +from typing import Dict, List + + +class JsonSerializable(object): + def to_json(self): + return self.payload + + def __str__(self): + return json.dumps(self.payload, indent=4) + + +class _LifecycleHook(JsonSerializable): + # https://argoproj.github.io/argo-workflows/fields/#lifecyclehook + + def __init__(self, name): + tree = lambda: defaultdict(tree) + self.name = name + self.payload = tree() + + def expression(self, expression): + self.payload["expression"] = str(expression) + return self + + def template(self, template): + self.payload["template"] = template + return self + + +class _Template(JsonSerializable): + # https://argoproj.github.io/argo-workflows/fields/#template + + def __init__(self, name): + tree = lambda: defaultdict(tree) + self.name = name + self.payload = tree() + self.payload["name"] = name + + def http(self, http): + self.payload["http"] = http.to_json() + return self + + def script(self, script): + self.payload["script"] = script.to_json() + return self + + def container(self, container): + self.payload["container"] = container + return self + + def service_account_name(self, service_account_name): + self.payload["serviceAccountName"] = service_account_name + return self + + +class Hook(object): + """ + Abstraction for Argo Workflows exit hooks. + A hook consists of a Template, and one or more LifecycleHooks that trigger the template + """ + + template: "_Template" + lifecycle_hooks: List["_LifecycleHook"] + + +class _HttpSpec(JsonSerializable): + # https://argoproj.github.io/argo-workflows/fields/#http + + def __init__(self, method): + tree = lambda: defaultdict(tree) + self.payload = tree() + self.payload["method"] = method + self.payload["headers"] = [] + + def header(self, header, value): + self.payload["headers"].append({"name": header, "value": value}) + return self + + def body(self, body): + self.payload["body"] = str(body) + return self + + def url(self, url): + self.payload["url"] = url + return self + + def success_condition(self, success_condition): + self.payload["successCondition"] = success_condition + return self + + +# HTTP hook +class HttpExitHook(Hook): + def __init__( + self, + name, + url, + method="GET", + headers=None, + body=None, + on_success=False, + on_error=False, + ): + self.template = _Template(name) + http = _HttpSpec(method).url(url) + if headers is not None: + for header, value in headers.items(): + http.header(header, value) + + if body is not None: + http.body(json.dumps(body)) + + self.template.http(http) + + self.lifecycle_hooks = [] + + if on_success and on_error: + raise Exception("Set only one of the on_success/on_error at a time.") + + if on_success: + self.lifecycle_hooks.append( + _LifecycleHook(name) + .expression("workflow.status == 'Succeeded'") + .template(self.template.name) + ) + + if on_error: + self.lifecycle_hooks.append( + _LifecycleHook(name) + .expression("workflow.status == 'Error' || workflow.status == 'Failed'") + .template(self.template.name) + ) + + if not on_success and not on_error: + # add an expressionless lifecycle hook + self.lifecycle_hooks.append(_LifecycleHook(name).template(name)) + + +class ExitHookHack(Hook): + # Warning: terrible hack to workaround a bug in Argo Workflow where the + # templates listed above do not execute unless there is an + # explicit exit hook. as and when this bug is patched, we should + # remove this effectively no-op template. + # Note: We use the Http template because changing this to an actual no-op container had the side-effect of + # leaving LifecycleHooks in a pending state even when they have finished execution. + def __init__( + self, + url, + headers=None, + body=None, + ): + self.template = _Template("exit-hook-hack") + http = _HttpSpec("GET").url(url) + if headers is not None: + for header, value in headers.items(): + http.header(header, value) + + if body is not None: + http.body(json.dumps(body)) + + http.success_condition("true == true") + + self.template.http(http) + + self.lifecycle_hooks = [] + + # add an expressionless lifecycle hook + self.lifecycle_hooks.append(_LifecycleHook("exit").template("exit-hook-hack")) + + +class ContainerHook(Hook): + def __init__( + self, + name: str, + container: Dict, + service_account_name: str = None, + on_success: bool = False, + on_error: bool = False, + ): + self.template = _Template(name) + + if service_account_name is not None: + self.template.service_account_name(service_account_name) + + self.template.container(container) + + self.lifecycle_hooks = [] + + if on_success and on_error: + raise Exception("Set only one of the on_success/on_error at a time.") + + if on_success: + self.lifecycle_hooks.append( + _LifecycleHook(name) + .expression("workflow.status == 'Succeeded'") + .template(self.template.name) + ) + + if on_error: + self.lifecycle_hooks.append( + _LifecycleHook(name) + .expression("workflow.status == 'Error' || workflow.status == 'Failed'") + .template(self.template.name) + ) + + if not on_success and not on_error: + # add an expressionless lifecycle hook + self.lifecycle_hooks.append(_LifecycleHook(name).template(name))