Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: custom annotations revisited #1568

Merged
merged 18 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions metaflow/metaflow_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,8 @@
KUBERNETES_SECRETS = from_conf("KUBERNETES_SECRETS", "")
# Default labels for kubernetes pods
KUBERNETES_LABELS = from_conf("KUBERNETES_LABELS", "")
# Default annotations for kubernetes pods
KUBERNETES_ANNOTATIONS = from_conf("KUBERNETES_ANNOTATIONS", "")
# Default GPU vendor to use by K8S jobs created by Metaflow (supports nvidia, amd)
KUBERNETES_GPU_VENDOR = from_conf("KUBERNETES_GPU_VENDOR", "nvidia")
# Default container image for K8S
Expand Down
182 changes: 83 additions & 99 deletions metaflow/plugins/argo/argo_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
DEFAULT_SECRETS_BACKEND_TYPE,
GCP_SECRET_MANAGER_PREFIX,
KUBERNETES_FETCH_EC2_METADATA,
KUBERNETES_LABELS,
KUBERNETES_NAMESPACE,
KUBERNETES_NODE_SELECTOR,
KUBERNETES_SANDBOX_INIT_SCRIPT,
Expand All @@ -54,10 +53,7 @@
from metaflow.mflog import BASH_SAVE_LOGS, bash_capture_logs, export_mflog_env_vars
from metaflow.parameters import deploy_time_eval
from metaflow.plugins.kubernetes.kube_utils import qos_requests_and_limits
from metaflow.plugins.kubernetes.kubernetes import (
parse_kube_keyvalue_list,
validate_kube_labels,
)

from metaflow.plugins.kubernetes.kubernetes_jobsets import KubernetesArgoJobSet
from metaflow.unbounded_foreach import UBF_CONTROL, UBF_TASK
from metaflow.user_configs.config_options import ConfigInput
Expand Down Expand Up @@ -173,7 +169,8 @@ def __init__(
self.triggers, self.trigger_options = self._process_triggers()
self._schedule, self._timezone = self._get_schedule()

self.kubernetes_labels = self._get_kubernetes_labels()
self._base_labels = self._base_kubernetes_labels()
self._base_annotations = self._base_kubernetes_annotations()
self._workflow_template = self._compile_workflow_template()
self._sensor = self._compile_sensor()

Expand Down Expand Up @@ -324,18 +321,42 @@ def trigger(cls, name, parameters=None):
except Exception as e:
raise ArgoWorkflowsException(str(e))

@staticmethod
def _get_kubernetes_labels():
def _base_kubernetes_labels(self):
"""
Get Kubernetes labels from environment variable.
Parses the string into a dict and validates that values adhere to Kubernetes restrictions.
Get shared Kubernetes labels for Argo resources.
"""
if not KUBERNETES_LABELS:
return {}
env_labels = KUBERNETES_LABELS.split(",")
env_labels = parse_kube_keyvalue_list(env_labels, False)
validate_kube_labels(env_labels)
return env_labels
# TODO: Add configuration through an environment variable or Metaflow config in the future if required.
labels = {"app.kubernetes.io/part-of": "metaflow"}

return labels

def _base_kubernetes_annotations(self):
"""
Get shared Kubernetes annotations for Argo resources.
"""
from datetime import datetime, timezone

# TODO: Add configuration through an environment variable or Metaflow config in the future if required.
# base annotations
annotations = {
"metaflow/production_token": self.production_token,
"metaflow/owner": self.username,
"metaflow/user": "argo-workflows",
"metaflow/flow_name": self.flow.name,
"metaflow/deployment_timestamp": str(
datetime.now(timezone.utc).isoformat()
),
}

if current.get("project_name"):
annotations.update(
{
"metaflow/project_name": current.project_name,
"metaflow/branch_name": current.branch_name,
"metaflow/project_flow_name": current.project_flow_name,
}
)
return annotations

def _get_schedule(self):
schedule = self.flow._flow_decorators.get("schedule")
Expand Down Expand Up @@ -676,18 +697,7 @@ def _compile_workflow_template(self):
# generate container templates at the top level (in WorkflowSpec) and maintain
# references to them within the DAGTask.

from datetime import datetime, timezone

annotations = {
"metaflow/production_token": self.production_token,
"metaflow/owner": self.username,
"metaflow/user": "argo-workflows",
"metaflow/flow_name": self.flow.name,
"metaflow/deployment_timestamp": str(
datetime.now(timezone.utc).isoformat()
),
}

annotations = {}
if self._schedule is not None:
# timezone is an optional field and json dumps on None will result in null
# hence configuring it to an empty string
Expand All @@ -699,15 +709,6 @@ def _compile_workflow_template(self):
if self.parameters:
annotations.update({"metaflow/parameters": json.dumps(self.parameters)})

if current.get("project_name"):
annotations.update(
{
"metaflow/project_name": current.project_name,
"metaflow/branch_name": current.branch_name,
"metaflow/project_flow_name": current.project_flow_name,
}
)

# Some more annotations to populate the Argo UI nicely
if self.tags:
annotations.update({"metaflow/tags": json.dumps(self.tags)})
Expand Down Expand Up @@ -755,9 +756,10 @@ def _compile_workflow_template(self):
# is released, we should be able to support multi-namespace /
# multi-cluster scheduling.
.namespace(KUBERNETES_NAMESPACE)
.label("app.kubernetes.io/name", "metaflow-flow")
.label("app.kubernetes.io/part-of", "metaflow")
.annotations(annotations)
.annotations(self._base_annotations)
.labels(self._base_labels)
.label("app.kubernetes.io/name", "metaflow-flow")
saikonen marked this conversation as resolved.
Show resolved Hide resolved
)
.spec(
WorkflowSpec()
Expand Down Expand Up @@ -787,10 +789,14 @@ def _compile_workflow_template(self):
# Set workflow metadata
.workflow_metadata(
Metadata()
.labels(self._base_labels)
.label("app.kubernetes.io/name", "metaflow-run")
.label("app.kubernetes.io/part-of", "metaflow")
.annotations(
{**annotations, **{"metaflow/run_id": "argo-{{workflow.name}}"}}
{
**annotations,
**self._base_annotations,
**{"metaflow/run_id": "argo-{{workflow.name}}"},
}
)
# TODO: Set dynamic labels using labels_from. Ideally, we would
# want to expose run_id as a label. It's easy to add labels,
Expand Down Expand Up @@ -823,10 +829,10 @@ def _compile_workflow_template(self):
# Set common pod metadata.
.pod_metadata(
Metadata()
.labels(self._base_labels)
.label("app.kubernetes.io/name", "metaflow-task")
.label("app.kubernetes.io/part-of", "metaflow")
.annotations(annotations)
saikonen marked this conversation as resolved.
Show resolved Hide resolved
.labels(self.kubernetes_labels)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@saikonen is there a reason that you remove these labels from the podMetadata section and instead repeat them in each task?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the podMetadata applying to all pods from the template wasn't a good fit for setting task-pod-specific labels, e.g.

@kubernetes(labels={"test-label": "first-step"})
@step
def first_step(self)
   ...

@kubernetes(labels={"test-label": "second-step"})
@step
def second_step(self)
   ...

as on the template level we can't make a choice which deco to pull the 'common' labels from.

The previous implementation did treat the common labels from the environment variable as shared, and set them in the podMetadata. Functionally there shouldn't be any difference though whether those are set here, or individually per-pod?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an issue with repeating common labels on the pod level?

We can extend on this, as there is a todo for the future to provide separate label/annotation global env configuration for argo-workflows which would enable labeling all relevant resources (templates, sensors etc.)

.annotations(self._base_annotations)
)
# Set the entrypoint to flow name
.entrypoint(self.flow.name)
Expand Down Expand Up @@ -1895,15 +1901,7 @@ def _container_templates(self):
# twice, but due to issues with variable substitution, we will have to
# live with this routine.
if node.parallel_step:
# Explicitly add the task-id-hint label. This is important because this label
# is returned as an Output parameter of this step and is used subsequently as an
# an input in the join step.
kubernetes_labels = self.kubernetes_labels.copy()
jobset_name = "{{inputs.parameters.jobset-name}}"
kubernetes_labels["task_id_entropy"] = (
"{{inputs.parameters.task-id-entropy}}"
)
kubernetes_labels["num_parallel"] = "{{inputs.parameters.num-parallel}}"
jobset = KubernetesArgoJobSet(
kubernetes_sdk=kubernetes_sdk,
name=jobset_name,
Expand Down Expand Up @@ -1959,8 +1957,22 @@ def _container_templates(self):
for k, v in env.items():
jobset.environment_variable(k, v)

for k, v in kubernetes_labels.items():
jobset.label(k, v)
# Set labels. Do not allow user-specified task labels to override internal ones.
#
# Explicitly add the task-id-hint label. This is important because this label
# is returned as an Output parameter of this step and is used subsequently as an
# an input in the join step.
kubernetes_labels = {
"task_id_entropy": "{{inputs.parameters.task-id-entropy}}",
"num_parallel": "{{inputs.parameters.num-parallel}}",
}
jobset.labels(
{
**resources["labels"],
**self._base_labels,
**kubernetes_labels,
}
)

jobset.environment_variable(
"MF_MASTER_ADDR", jobset.jobset_control_addr
Expand Down Expand Up @@ -1989,27 +2001,23 @@ def _container_templates(self):
"TASK_ID_SUFFIX": "metadata.annotations['jobset.sigs.k8s.io/job-index']",
}
)

# Set annotations. Do not allow user-specified task-specific annotations to override internal ones.
annotations = {
# setting annotations explicitly as they wont be
# passed down from WorkflowTemplate level
"metaflow/step_name": node.name,
"metaflow/attempt": str(retry_count),
"metaflow/run_id": run_id,
"metaflow/production_token": self.production_token,
"metaflow/owner": self.username,
"metaflow/user": "argo-workflows",
"metaflow/flow_name": self.flow.name,
}
if current.get("project_name"):
annotations.update(
{
"metaflow/project_name": current.project_name,
"metaflow/branch_name": current.branch_name,
"metaflow/project_flow_name": current.project_flow_name,
}
)
for k, v in annotations.items():
jobset.annotation(k, v)

jobset.annotations(
{
**resources["annotations"],
**self._base_annotations,
**annotations,
}
)

jobset.control.replicas(1)
jobset.worker.replicas("{{=asInt(inputs.parameters.workerCount)}}")
Expand Down Expand Up @@ -2066,13 +2074,16 @@ def _container_templates(self):
minutes_between_retries=minutes_between_retries,
)
.metadata(
ObjectMeta().annotation("metaflow/step_name", node.name)
ObjectMeta()
.annotation("metaflow/step_name", node.name)
# Unfortunately, we can't set the task_id since it is generated
# inside the pod. However, it can be inferred from the annotation
# set by argo-workflows - `workflows.argoproj.io/outputs` - refer
# the field 'task-id' in 'parameters'
# .annotation("metaflow/task_id", ...)
.annotation("metaflow/attempt", retry_count)
.annotations(resources["annotations"])
.labels(resources["labels"])
saikonen marked this conversation as resolved.
Show resolved Hide resolved
)
# Set emptyDir volume for state management
.empty_dir_volume("out")
Expand Down Expand Up @@ -2840,44 +2851,16 @@ def _compile_sensor(self):
"sdk (https://pypi.org/project/kubernetes/) first."
)

labels = {"app.kubernetes.io/part-of": "metaflow"}

annotations = {
"metaflow/production_token": self.production_token,
"metaflow/owner": self.username,
"metaflow/user": "argo-workflows",
"metaflow/flow_name": self.flow.name,
}
if current.get("project_name"):
annotations.update(
{
"metaflow/project_name": current.project_name,
"metaflow/branch_name": current.branch_name,
"metaflow/project_flow_name": current.project_flow_name,
}
)

# Useful to paint the UI
trigger_annotations = {
saikonen marked this conversation as resolved.
Show resolved Hide resolved
"metaflow/triggered_by": json.dumps(
[
{key: trigger.get(key) for key in ["name", "type"]}
for trigger in self.triggers
]
)
}

return (
Sensor()
.metadata(
# Sensor metadata.
ObjectMeta()
.name(ArgoWorkflows._sensor_name(self.name))
.namespace(KUBERNETES_NAMESPACE)
.labels(self._base_labels)
.label("app.kubernetes.io/name", "metaflow-sensor")
.label("app.kubernetes.io/part-of", "metaflow")
.labels(self.kubernetes_labels)
.annotations(annotations)
.annotations(self._base_annotations)
)
.spec(
SensorSpec().template(
Expand All @@ -2887,7 +2870,7 @@ def _compile_sensor(self):
ObjectMeta()
.label("app.kubernetes.io/name", "metaflow-sensor")
.label("app.kubernetes.io/part-of", "metaflow")
.annotations(annotations)
.annotations(self._base_annotations)
)
.container(
# Run sensor in guaranteed QoS. The sensor isn't doing a lot
Expand Down Expand Up @@ -2934,6 +2917,7 @@ def _compile_sensor(self):
"metadata": {
"generateName": "%s-" % self.name,
"namespace": KUBERNETES_NAMESPACE,
# Useful to paint the UI
"annotations": {
"metaflow/triggered_by": json.dumps(
[
Expand Down
56 changes: 55 additions & 1 deletion metaflow/plugins/kubernetes/kube_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
from metaflow.exception import CommandException
import re
from typing import Dict, List, Optional
from metaflow.exception import CommandException, MetaflowException
from metaflow.util import get_username, get_latest_run_id


# avoid circular import by having the exception class contained here
class KubernetesException(MetaflowException):
headline = "Kubernetes error"


def parse_cli_options(flow_name, run_id, user, my_runs, echo):
if user and my_runs:
raise CommandException("--user and --my-runs are mutually exclusive.")
Expand Down Expand Up @@ -52,3 +59,50 @@ def qos_requests_and_limits(qos: str, cpu: int, memory: int, storage: int):
# TODO: Add support for BestEffort once there is a use case for it.
# BestEffort - no limit or requests for cpu/memory
return qos_requests, qos_limits


def validate_kube_labels(
labels: Optional[Dict[str, Optional[str]]],
) -> bool:
"""Validate label values.

This validates the kubernetes label values. It does not validate the keys.
Ideally, keys should be static and also the validation rules for keys are
more complex than those for values. For full validation rules, see:

https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set
"""

def validate_label(s: Optional[str]):
regex_match = r"^(([A-Za-z0-9][-A-Za-z0-9_.]{0,61})?[A-Za-z0-9])?$"
if not s:
# allow empty label
return True
if not re.search(regex_match, s):
raise KubernetesException(
'Invalid value: "%s"\n'
"A valid label must be an empty string or one that\n"
" - Consist of alphanumeric, '-', '_' or '.' characters\n"
" - Begins and ends with an alphanumeric character\n"
" - Is at most 63 characters" % s
)
return True

return all([validate_label(v) for v in labels.values()]) if labels else True


def parse_kube_keyvalue_list(items: List[str], requires_both: bool = True):
try:
ret = {}
for item_str in items:
item = item_str.split("=", 1)
if requires_both:
item[1] # raise IndexError
if str(item[0]) in ret:
raise KubernetesException("Duplicate key found: %s" % str(item[0]))
ret[str(item[0])] = str(item[1]) if len(item) > 1 else None
return ret
except KubernetesException as e:
raise e
except (AttributeError, IndexError):
raise KubernetesException("Unable to parse kubernetes list: %s" % items)
Loading
Loading