From 768b92b56352f405639a24aaf06b1caa1a9ac76b Mon Sep 17 00:00:00 2001 From: savin Date: Wed, 18 Oct 2023 12:06:24 -0700 Subject: [PATCH] introduce argo-workflows status --- metaflow/plugins/argo/argo_client.py | 6 ++--- metaflow/plugins/argo/argo_events.py | 2 +- metaflow/plugins/argo/argo_workflows.py | 30 +++++++++++++++------ metaflow/plugins/argo/argo_workflows_cli.py | 25 +++++++++++++++-- 4 files changed, 49 insertions(+), 14 deletions(-) diff --git a/metaflow/plugins/argo/argo_client.py b/metaflow/plugins/argo/argo_client.py index 6e6a809654b..ffa082ca130 100644 --- a/metaflow/plugins/argo/argo_client.py +++ b/metaflow/plugins/argo/argo_client.py @@ -168,7 +168,7 @@ def delete_workflow_template(self, name): json.loads(e.body)["message"] if e.body is not None else e.reason ) - def terminate_workflow(self, run_id): + def terminate_workflow(self, name): client = self._client.get() try: workflow = client.CustomObjectsApi().get_namespaced_custom_object( @@ -176,7 +176,7 @@ def terminate_workflow(self, run_id): version=self._version, namespace=self._namespace, plural="workflows", - name=run_id, + name=name, ) except client.rest.ApiException as e: raise ArgoClientException( @@ -198,7 +198,7 @@ def terminate_workflow(self, run_id): version=self._version, namespace=self._namespace, plural="workflows", - name=run_id, + name=name, body=body, ) except client.rest.ApiException as e: diff --git a/metaflow/plugins/argo/argo_events.py b/metaflow/plugins/argo/argo_events.py index f2fdfcb4408..7a203506517 100644 --- a/metaflow/plugins/argo/argo_events.py +++ b/metaflow/plugins/argo/argo_events.py @@ -8,8 +8,8 @@ from metaflow.exception import MetaflowException from metaflow.metaflow_config import ( - ARGO_EVENTS_WEBHOOK_URL, ARGO_EVENTS_WEBHOOK_AUTH, + ARGO_EVENTS_WEBHOOK_URL, SERVICE_HEADERS, ) diff --git a/metaflow/plugins/argo/argo_workflows.py b/metaflow/plugins/argo/argo_workflows.py index f28b4f443bc..4b64af36cc9 100644 --- a/metaflow/plugins/argo/argo_workflows.py +++ b/metaflow/plugins/argo/argo_workflows.py @@ -7,19 +7,20 @@ from collections import defaultdict from hashlib import sha1 -from metaflow import current, JSONType -from metaflow.includefile import FilePathClass +from metaflow import JSONType, current from metaflow.decorators import flow_decorators from metaflow.exception import MetaflowException +from metaflow.includefile import FilePathClass from metaflow.metaflow_config import ( ARGO_EVENTS_EVENT, ARGO_EVENTS_EVENT_BUS, ARGO_EVENTS_EVENT_SOURCE, - ARGO_EVENTS_SERVICE_ACCOUNT, ARGO_EVENTS_INTERNAL_WEBHOOK_URL, + ARGO_EVENTS_SERVICE_ACCOUNT, + ARGO_EVENTS_WEBHOOK_AUTH, ARGO_WORKFLOWS_ENV_VARS_TO_SKIP, ARGO_WORKFLOWS_KUBERNETES_SECRETS, - ARGO_EVENTS_WEBHOOK_AUTH, + ARGO_WORKFLOWS_UI_URL, AWS_SECRETS_MANAGER_DEFAULT_REGION, AZURE_STORAGE_BLOB_SERVICE_ENDPOINT, CARD_AZUREROOT, @@ -38,15 +39,12 @@ KUBERNETES_SANDBOX_INIT_SCRIPT, KUBERNETES_SECRETS, S3_ENDPOINT_URL, + S3_SERVER_SIDE_ENCRYPTION, SERVICE_HEADERS, SERVICE_INTERNAL_URL, - S3_SERVER_SIDE_ENCRYPTION, UI_URL, - ARGO_WORKFLOWS_UI_URL, ) - from metaflow.metaflow_config_funcs import config_values - from metaflow.mflog import BASH_SAVE_LOGS, bash_capture_logs, export_mflog_env_vars from metaflow.parameters import deploy_time_eval from metaflow.plugins.kubernetes.kubernetes import ( @@ -241,6 +239,22 @@ def terminate(flow_name, name): ) ) + @staticmethod + def get_workflow_status(flow_name, name): + client = ArgoClient(namespace=KUBERNETES_NAMESPACE) + # TODO: Only look for workflows for the specified flow + workflow = client.get_workflow(name) + if workflow: + # return workflow phase for now + status = workflow.get("status", {}).get("phase") + return status + else: + raise ArgoWorkflowsException( + "No execution found for {flow_name}/{run_id} in Argo Workflows.".format( + flow_name=flow_name, run_id=name + ) + ) + @staticmethod def suspend(name): client = ArgoClient(namespace=KUBERNETES_NAMESPACE) diff --git a/metaflow/plugins/argo/argo_workflows_cli.py b/metaflow/plugins/argo/argo_workflows_cli.py index 5c8fdf83888..1ccfc89f95e 100644 --- a/metaflow/plugins/argo/argo_workflows_cli.py +++ b/metaflow/plugins/argo/argo_workflows_cli.py @@ -10,10 +10,10 @@ from metaflow._vendor import click from metaflow.exception import MetaflowException, MetaflowInternalError from metaflow.metaflow_config import ( - SERVICE_VERSION_CHECK, - UI_URL, ARGO_WORKFLOWS_UI_URL, KUBERNETES_NAMESPACE, + SERVICE_VERSION_CHECK, + UI_URL, ) from metaflow.package import MetaflowPackage @@ -787,6 +787,27 @@ def validate_token(name, token_prefix, authorize, instructions_fn=None): return True +@argo_workflows.command(help="Fetch flow execution status on Argo Workflows.") +@click.argument("run-id", required=True, type=str) +@click.pass_obj +def status(obj, run_id): + if not run_id.startswith("argo-"): + raise RunIdMismatch( + "Run IDs for flows executed through Argo Workflows begin with 'argo-'" + ) + obj.echo( + "Fetching status for run *{run_id}* for {flow_name} ...".format( + run_id=run_id, flow_name=obj.flow.name + ), + bold=True, + ) + # Trim prefix from run_id + name = run_id[5:] + status = ArgoWorkflows.get_workflow_status(obj.flow.name, name) + if status is not None: + obj.echo_always(status) + + @argo_workflows.command(help="Terminate flow execution on Argo Workflows.") @click.option( "--authorize",