-
Notifications
You must be signed in to change notification settings - Fork 848
feature: custom annotations revisited #1568
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
13978b4
2eaf40d
c481661
67fa2cb
5e4f06f
bf0fc4b
609c7e6
6fda6d0
96133ee
41c6fb0
d453897
8f983a5
f0adec1
31723a8
ecaba3a
253301c
30adda1
e1b7a31
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -39,7 +39,6 @@ | |
DEFAULT_SECRETS_BACKEND_TYPE, | ||
GCP_SECRET_MANAGER_PREFIX, | ||
KUBERNETES_FETCH_EC2_METADATA, | ||
KUBERNETES_LABELS, | ||
KUBERNETES_NAMESPACE, | ||
KUBERNETES_NODE_SELECTOR, | ||
KUBERNETES_SANDBOX_INIT_SCRIPT, | ||
|
@@ -54,10 +53,7 @@ | |
from metaflow.mflog import BASH_SAVE_LOGS, bash_capture_logs, export_mflog_env_vars | ||
from metaflow.parameters import deploy_time_eval | ||
from metaflow.plugins.kubernetes.kube_utils import qos_requests_and_limits | ||
from metaflow.plugins.kubernetes.kubernetes import ( | ||
parse_kube_keyvalue_list, | ||
validate_kube_labels, | ||
) | ||
|
||
from metaflow.plugins.kubernetes.kubernetes_jobsets import KubernetesArgoJobSet | ||
from metaflow.unbounded_foreach import UBF_CONTROL, UBF_TASK | ||
from metaflow.user_configs.config_options import ConfigInput | ||
|
@@ -173,7 +169,8 @@ def __init__( | |
self.triggers, self.trigger_options = self._process_triggers() | ||
self._schedule, self._timezone = self._get_schedule() | ||
|
||
self.kubernetes_labels = self._get_kubernetes_labels() | ||
self._base_labels = self._base_kubernetes_labels() | ||
self._base_annotations = self._base_kubernetes_annotations() | ||
self._workflow_template = self._compile_workflow_template() | ||
self._sensor = self._compile_sensor() | ||
|
||
|
@@ -324,18 +321,42 @@ def trigger(cls, name, parameters=None): | |
except Exception as e: | ||
raise ArgoWorkflowsException(str(e)) | ||
|
||
@staticmethod | ||
def _get_kubernetes_labels(): | ||
def _base_kubernetes_labels(self): | ||
""" | ||
Get Kubernetes labels from environment variable. | ||
Parses the string into a dict and validates that values adhere to Kubernetes restrictions. | ||
Get shared Kubernetes labels for Argo resources. | ||
""" | ||
if not KUBERNETES_LABELS: | ||
return {} | ||
env_labels = KUBERNETES_LABELS.split(",") | ||
env_labels = parse_kube_keyvalue_list(env_labels, False) | ||
validate_kube_labels(env_labels) | ||
return env_labels | ||
# TODO: Add configuration through an environment variable or Metaflow config in the future if required. | ||
labels = {"app.kubernetes.io/part-of": "metaflow"} | ||
|
||
return labels | ||
|
||
def _base_kubernetes_annotations(self): | ||
""" | ||
Get shared Kubernetes annotations for Argo resources. | ||
""" | ||
from datetime import datetime, timezone | ||
|
||
# TODO: Add configuration through an environment variable or Metaflow config in the future if required. | ||
# base annotations | ||
annotations = { | ||
"metaflow/production_token": self.production_token, | ||
"metaflow/owner": self.username, | ||
"metaflow/user": "argo-workflows", | ||
"metaflow/flow_name": self.flow.name, | ||
"metaflow/deployment_timestamp": str( | ||
datetime.now(timezone.utc).isoformat() | ||
), | ||
} | ||
|
||
if current.get("project_name"): | ||
annotations.update( | ||
{ | ||
"metaflow/project_name": current.project_name, | ||
"metaflow/branch_name": current.branch_name, | ||
"metaflow/project_flow_name": current.project_flow_name, | ||
} | ||
) | ||
return annotations | ||
|
||
def _get_schedule(self): | ||
schedule = self.flow._flow_decorators.get("schedule") | ||
|
@@ -676,18 +697,7 @@ def _compile_workflow_template(self): | |
# generate container templates at the top level (in WorkflowSpec) and maintain | ||
# references to them within the DAGTask. | ||
|
||
from datetime import datetime, timezone | ||
|
||
annotations = { | ||
"metaflow/production_token": self.production_token, | ||
"metaflow/owner": self.username, | ||
"metaflow/user": "argo-workflows", | ||
"metaflow/flow_name": self.flow.name, | ||
"metaflow/deployment_timestamp": str( | ||
datetime.now(timezone.utc).isoformat() | ||
), | ||
} | ||
|
||
annotations = {} | ||
if self._schedule is not None: | ||
# timezone is an optional field and json dumps on None will result in null | ||
# hence configuring it to an empty string | ||
|
@@ -699,15 +709,6 @@ def _compile_workflow_template(self): | |
if self.parameters: | ||
annotations.update({"metaflow/parameters": json.dumps(self.parameters)}) | ||
|
||
if current.get("project_name"): | ||
annotations.update( | ||
{ | ||
"metaflow/project_name": current.project_name, | ||
"metaflow/branch_name": current.branch_name, | ||
"metaflow/project_flow_name": current.project_flow_name, | ||
} | ||
) | ||
|
||
# Some more annotations to populate the Argo UI nicely | ||
if self.tags: | ||
annotations.update({"metaflow/tags": json.dumps(self.tags)}) | ||
|
@@ -755,9 +756,10 @@ def _compile_workflow_template(self): | |
# is released, we should be able to support multi-namespace / | ||
# multi-cluster scheduling. | ||
.namespace(KUBERNETES_NAMESPACE) | ||
.label("app.kubernetes.io/name", "metaflow-flow") | ||
.label("app.kubernetes.io/part-of", "metaflow") | ||
.annotations(annotations) | ||
.annotations(self._base_annotations) | ||
.labels(self._base_labels) | ||
.label("app.kubernetes.io/name", "metaflow-flow") | ||
) | ||
.spec( | ||
WorkflowSpec() | ||
|
@@ -787,10 +789,14 @@ def _compile_workflow_template(self): | |
# Set workflow metadata | ||
.workflow_metadata( | ||
Metadata() | ||
.labels(self._base_labels) | ||
.label("app.kubernetes.io/name", "metaflow-run") | ||
.label("app.kubernetes.io/part-of", "metaflow") | ||
.annotations( | ||
{**annotations, **{"metaflow/run_id": "argo-{{workflow.name}}"}} | ||
{ | ||
**annotations, | ||
**self._base_annotations, | ||
**{"metaflow/run_id": "argo-{{workflow.name}}"}, | ||
} | ||
) | ||
# TODO: Set dynamic labels using labels_from. Ideally, we would | ||
# want to expose run_id as a label. It's easy to add labels, | ||
|
@@ -823,10 +829,10 @@ def _compile_workflow_template(self): | |
# Set common pod metadata. | ||
.pod_metadata( | ||
Metadata() | ||
.labels(self._base_labels) | ||
.label("app.kubernetes.io/name", "metaflow-task") | ||
.label("app.kubernetes.io/part-of", "metaflow") | ||
.annotations(annotations) | ||
saikonen marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.labels(self.kubernetes_labels) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @saikonen is there a reason that you remove these labels from the podMetadata section and instead repeat them in each task? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the podMetadata applying to all pods from the template wasn't a good fit for setting task-pod-specific labels, e.g.
as on the template level we can't make a choice which deco to pull the 'common' labels from. The previous implementation did treat the common labels from the environment variable as shared, and set them in the podMetadata. Functionally there shouldn't be any difference though whether those are set here, or individually per-pod? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there an issue with repeating common labels on the pod level? We can extend on this, as there is a todo for the future to provide separate label/annotation global env configuration for argo-workflows which would enable labeling all relevant resources (templates, sensors etc.) |
||
.annotations(self._base_annotations) | ||
) | ||
# Set the entrypoint to flow name | ||
.entrypoint(self.flow.name) | ||
|
@@ -1895,15 +1901,7 @@ def _container_templates(self): | |
# twice, but due to issues with variable substitution, we will have to | ||
# live with this routine. | ||
if node.parallel_step: | ||
# Explicitly add the task-id-hint label. This is important because this label | ||
# is returned as an Output parameter of this step and is used subsequently as an | ||
# an input in the join step. | ||
kubernetes_labels = self.kubernetes_labels.copy() | ||
jobset_name = "{{inputs.parameters.jobset-name}}" | ||
kubernetes_labels["task_id_entropy"] = ( | ||
"{{inputs.parameters.task-id-entropy}}" | ||
) | ||
kubernetes_labels["num_parallel"] = "{{inputs.parameters.num-parallel}}" | ||
jobset = KubernetesArgoJobSet( | ||
kubernetes_sdk=kubernetes_sdk, | ||
name=jobset_name, | ||
|
@@ -1959,8 +1957,22 @@ def _container_templates(self): | |
for k, v in env.items(): | ||
jobset.environment_variable(k, v) | ||
|
||
for k, v in kubernetes_labels.items(): | ||
jobset.label(k, v) | ||
# Set labels. Do not allow user-specified task labels to override internal ones. | ||
# | ||
# Explicitly add the task-id-hint label. This is important because this label | ||
# is returned as an Output parameter of this step and is used subsequently as an | ||
# an input in the join step. | ||
kubernetes_labels = { | ||
"task_id_entropy": "{{inputs.parameters.task-id-entropy}}", | ||
"num_parallel": "{{inputs.parameters.num-parallel}}", | ||
} | ||
jobset.labels( | ||
{ | ||
**resources["labels"], | ||
**self._base_labels, | ||
**kubernetes_labels, | ||
} | ||
) | ||
|
||
jobset.environment_variable( | ||
"MF_MASTER_ADDR", jobset.jobset_control_addr | ||
|
@@ -1989,27 +2001,23 @@ def _container_templates(self): | |
"TASK_ID_SUFFIX": "metadata.annotations['jobset.sigs.k8s.io/job-index']", | ||
} | ||
) | ||
|
||
# Set annotations. Do not allow user-specified task-specific annotations to override internal ones. | ||
annotations = { | ||
# setting annotations explicitly as they wont be | ||
# passed down from WorkflowTemplate level | ||
"metaflow/step_name": node.name, | ||
"metaflow/attempt": str(retry_count), | ||
"metaflow/run_id": run_id, | ||
"metaflow/production_token": self.production_token, | ||
"metaflow/owner": self.username, | ||
"metaflow/user": "argo-workflows", | ||
"metaflow/flow_name": self.flow.name, | ||
} | ||
if current.get("project_name"): | ||
annotations.update( | ||
{ | ||
"metaflow/project_name": current.project_name, | ||
"metaflow/branch_name": current.branch_name, | ||
"metaflow/project_flow_name": current.project_flow_name, | ||
} | ||
) | ||
for k, v in annotations.items(): | ||
jobset.annotation(k, v) | ||
|
||
jobset.annotations( | ||
{ | ||
**resources["annotations"], | ||
**self._base_annotations, | ||
**annotations, | ||
} | ||
) | ||
|
||
jobset.control.replicas(1) | ||
jobset.worker.replicas("{{=asInt(inputs.parameters.workerCount)}}") | ||
|
@@ -2066,13 +2074,16 @@ def _container_templates(self): | |
minutes_between_retries=minutes_between_retries, | ||
) | ||
.metadata( | ||
ObjectMeta().annotation("metaflow/step_name", node.name) | ||
ObjectMeta() | ||
.annotation("metaflow/step_name", node.name) | ||
# Unfortunately, we can't set the task_id since it is generated | ||
# inside the pod. However, it can be inferred from the annotation | ||
# set by argo-workflows - `workflows.argoproj.io/outputs` - refer | ||
# the field 'task-id' in 'parameters' | ||
# .annotation("metaflow/task_id", ...) | ||
.annotation("metaflow/attempt", retry_count) | ||
.annotations(resources["annotations"]) | ||
.labels(resources["labels"]) | ||
saikonen marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) | ||
# Set emptyDir volume for state management | ||
.empty_dir_volume("out") | ||
|
@@ -2840,44 +2851,16 @@ def _compile_sensor(self): | |
"sdk (https://pypi.org/project/kubernetes/) first." | ||
) | ||
|
||
labels = {"app.kubernetes.io/part-of": "metaflow"} | ||
|
||
annotations = { | ||
"metaflow/production_token": self.production_token, | ||
"metaflow/owner": self.username, | ||
"metaflow/user": "argo-workflows", | ||
"metaflow/flow_name": self.flow.name, | ||
} | ||
if current.get("project_name"): | ||
annotations.update( | ||
{ | ||
"metaflow/project_name": current.project_name, | ||
"metaflow/branch_name": current.branch_name, | ||
"metaflow/project_flow_name": current.project_flow_name, | ||
} | ||
) | ||
|
||
# Useful to paint the UI | ||
trigger_annotations = { | ||
saikonen marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"metaflow/triggered_by": json.dumps( | ||
[ | ||
{key: trigger.get(key) for key in ["name", "type"]} | ||
for trigger in self.triggers | ||
] | ||
) | ||
} | ||
|
||
return ( | ||
Sensor() | ||
.metadata( | ||
# Sensor metadata. | ||
ObjectMeta() | ||
.name(ArgoWorkflows._sensor_name(self.name)) | ||
.namespace(KUBERNETES_NAMESPACE) | ||
.labels(self._base_labels) | ||
.label("app.kubernetes.io/name", "metaflow-sensor") | ||
.label("app.kubernetes.io/part-of", "metaflow") | ||
.labels(self.kubernetes_labels) | ||
.annotations(annotations) | ||
.annotations(self._base_annotations) | ||
) | ||
.spec( | ||
SensorSpec().template( | ||
|
@@ -2887,7 +2870,7 @@ def _compile_sensor(self): | |
ObjectMeta() | ||
.label("app.kubernetes.io/name", "metaflow-sensor") | ||
.label("app.kubernetes.io/part-of", "metaflow") | ||
.annotations(annotations) | ||
.annotations(self._base_annotations) | ||
) | ||
.container( | ||
# Run sensor in guaranteed QoS. The sensor isn't doing a lot | ||
|
@@ -2934,6 +2917,7 @@ def _compile_sensor(self): | |
"metadata": { | ||
"generateName": "%s-" % self.name, | ||
"namespace": KUBERNETES_NAMESPACE, | ||
# Useful to paint the UI | ||
"annotations": { | ||
"metaflow/triggered_by": json.dumps( | ||
[ | ||
|
Uh oh!
There was an error while loading. Please reload this page.