Skip to content

Commit

Permalink
introduce argo-workflows status (#1600)
Browse files Browse the repository at this point in the history
  • Loading branch information
savingoyal authored Oct 18, 2023
1 parent 099754a commit f1e8e8d
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 14 deletions.
6 changes: 3 additions & 3 deletions metaflow/plugins/argo/argo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,15 @@ def delete_workflow_template(self, name):
json.loads(e.body)["message"] if e.body is not None else e.reason
)

def terminate_workflow(self, run_id):
def terminate_workflow(self, name):
client = self._client.get()
try:
workflow = client.CustomObjectsApi().get_namespaced_custom_object(
group=self._group,
version=self._version,
namespace=self._namespace,
plural="workflows",
name=run_id,
name=name,
)
except client.rest.ApiException as e:
raise ArgoClientException(
Expand All @@ -198,7 +198,7 @@ def terminate_workflow(self, run_id):
version=self._version,
namespace=self._namespace,
plural="workflows",
name=run_id,
name=name,
body=body,
)
except client.rest.ApiException as e:
Expand Down
2 changes: 1 addition & 1 deletion metaflow/plugins/argo/argo_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

from metaflow.exception import MetaflowException
from metaflow.metaflow_config import (
ARGO_EVENTS_WEBHOOK_URL,
ARGO_EVENTS_WEBHOOK_AUTH,
ARGO_EVENTS_WEBHOOK_URL,
SERVICE_HEADERS,
)

Expand Down
30 changes: 22 additions & 8 deletions metaflow/plugins/argo/argo_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,20 @@
from collections import defaultdict
from hashlib import sha1

from metaflow import current, JSONType
from metaflow.includefile import FilePathClass
from metaflow import JSONType, current
from metaflow.decorators import flow_decorators
from metaflow.exception import MetaflowException
from metaflow.includefile import FilePathClass
from metaflow.metaflow_config import (
ARGO_EVENTS_EVENT,
ARGO_EVENTS_EVENT_BUS,
ARGO_EVENTS_EVENT_SOURCE,
ARGO_EVENTS_SERVICE_ACCOUNT,
ARGO_EVENTS_INTERNAL_WEBHOOK_URL,
ARGO_EVENTS_SERVICE_ACCOUNT,
ARGO_EVENTS_WEBHOOK_AUTH,
ARGO_WORKFLOWS_ENV_VARS_TO_SKIP,
ARGO_WORKFLOWS_KUBERNETES_SECRETS,
ARGO_EVENTS_WEBHOOK_AUTH,
ARGO_WORKFLOWS_UI_URL,
AWS_SECRETS_MANAGER_DEFAULT_REGION,
AZURE_STORAGE_BLOB_SERVICE_ENDPOINT,
CARD_AZUREROOT,
Expand All @@ -38,15 +39,12 @@
KUBERNETES_SANDBOX_INIT_SCRIPT,
KUBERNETES_SECRETS,
S3_ENDPOINT_URL,
S3_SERVER_SIDE_ENCRYPTION,
SERVICE_HEADERS,
SERVICE_INTERNAL_URL,
S3_SERVER_SIDE_ENCRYPTION,
UI_URL,
ARGO_WORKFLOWS_UI_URL,
)

from metaflow.metaflow_config_funcs import config_values

from metaflow.mflog import BASH_SAVE_LOGS, bash_capture_logs, export_mflog_env_vars
from metaflow.parameters import deploy_time_eval
from metaflow.plugins.kubernetes.kubernetes import (
Expand Down Expand Up @@ -241,6 +239,22 @@ def terminate(flow_name, name):
)
)

@staticmethod
def get_workflow_status(flow_name, name):
client = ArgoClient(namespace=KUBERNETES_NAMESPACE)
# TODO: Only look for workflows for the specified flow
workflow = client.get_workflow(name)
if workflow:
# return workflow phase for now
status = workflow.get("status", {}).get("phase")
return status
else:
raise ArgoWorkflowsException(
"No execution found for {flow_name}/{run_id} in Argo Workflows.".format(
flow_name=flow_name, run_id=name
)
)

@staticmethod
def suspend(name):
client = ArgoClient(namespace=KUBERNETES_NAMESPACE)
Expand Down
25 changes: 23 additions & 2 deletions metaflow/plugins/argo/argo_workflows_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
from metaflow._vendor import click
from metaflow.exception import MetaflowException, MetaflowInternalError
from metaflow.metaflow_config import (
SERVICE_VERSION_CHECK,
UI_URL,
ARGO_WORKFLOWS_UI_URL,
KUBERNETES_NAMESPACE,
SERVICE_VERSION_CHECK,
UI_URL,
)
from metaflow.package import MetaflowPackage

Expand Down Expand Up @@ -787,6 +787,27 @@ def validate_token(name, token_prefix, authorize, instructions_fn=None):
return True


@argo_workflows.command(help="Fetch flow execution status on Argo Workflows.")
@click.argument("run-id", required=True, type=str)
@click.pass_obj
def status(obj, run_id):
if not run_id.startswith("argo-"):
raise RunIdMismatch(
"Run IDs for flows executed through Argo Workflows begin with 'argo-'"
)
obj.echo(
"Fetching status for run *{run_id}* for {flow_name} ...".format(
run_id=run_id, flow_name=obj.flow.name
),
bold=True,
)
# Trim prefix from run_id
name = run_id[5:]
status = ArgoWorkflows.get_workflow_status(obj.flow.name, name)
if status is not None:
obj.echo_always(status)


@argo_workflows.command(help="Terminate flow execution on Argo Workflows.")
@click.option(
"--authorize",
Expand Down

0 comments on commit f1e8e8d

Please sign in to comment.