From abd020bd8ae1763bf58a86691fb69372ba1cbc4e Mon Sep 17 00:00:00 2001 From: Tyler Rhodes <767526+trhodeos@users.noreply.github.com> Date: Wed, 16 Oct 2024 14:59:01 -0500 Subject: [PATCH 1/9] Add support for ephemeral volume claims to kubernetes plugin --- metaflow/metaflow_config.py | 3 ++ metaflow/plugins/kubernetes/kubernetes.py | 4 +++ metaflow/plugins/kubernetes/kubernetes_cli.py | 5 +++ .../kubernetes/kubernetes_decorator.py | 13 +++++++- metaflow/plugins/kubernetes/kubernetes_job.py | 32 +++++++++++++++++++ .../plugins/kubernetes/kubernetes_jobsets.py | 32 +++++++++++++++++++ 6 files changed, 88 insertions(+), 1 deletion(-) diff --git a/metaflow/metaflow_config.py b/metaflow/metaflow_config.py index a590b40527b..0055e940c60 100644 --- a/metaflow/metaflow_config.py +++ b/metaflow/metaflow_config.py @@ -352,6 +352,9 @@ KUBERNETES_PERSISTENT_VOLUME_CLAIMS = from_conf( "KUBERNETES_PERSISTENT_VOLUME_CLAIMS", "" ) +KUBERNETES_EPHEMERAL_VOLUME_CLAIMS = from_conf( + "KUBERNETES_EPHEMERAL_VOLUME_CLAIMS", "" +) KUBERNETES_SECRETS = from_conf("KUBERNETES_SECRETS", "") # Default labels for kubernetes pods KUBERNETES_LABELS = from_conf("KUBERNETES_LABELS", "") diff --git a/metaflow/plugins/kubernetes/kubernetes.py b/metaflow/plugins/kubernetes/kubernetes.py index 195dbf041ce..a43fa41f037 100644 --- a/metaflow/plugins/kubernetes/kubernetes.py +++ b/metaflow/plugins/kubernetes/kubernetes.py @@ -191,6 +191,7 @@ def create_jobset( run_time_limit=None, env=None, persistent_volume_claims=None, + ephemeral_volume_claims=None, tolerations=None, labels=None, shared_memory=None, @@ -226,6 +227,7 @@ def create_jobset( tmpfs_size=tmpfs_size, tmpfs_path=tmpfs_path, persistent_volume_claims=persistent_volume_claims, + ephemeral_volume_claims=ephemeral_volume_claims, shared_memory=shared_memory, port=port, num_parallel=num_parallel, @@ -485,6 +487,7 @@ def create_job_object( run_time_limit=None, env=None, persistent_volume_claims=None, + ephemeral_volume_claims=None, tolerations=None, labels=None, shared_memory=None, @@ -529,6 +532,7 @@ def create_job_object( tmpfs_size=tmpfs_size, tmpfs_path=tmpfs_path, persistent_volume_claims=persistent_volume_claims, + ephemeral_volume_claims=ephemeral_volume_claims, shared_memory=shared_memory, port=port, qos=qos, diff --git a/metaflow/plugins/kubernetes/kubernetes_cli.py b/metaflow/plugins/kubernetes/kubernetes_cli.py index def709cadea..51d5888cffc 100644 --- a/metaflow/plugins/kubernetes/kubernetes_cli.py +++ b/metaflow/plugins/kubernetes/kubernetes_cli.py @@ -109,6 +109,9 @@ def kubernetes(): @click.option( "--persistent-volume-claims", type=JSONTypeClass(), default=None, multiple=False ) +@click.option( + "--ephemeral-volume-claims", type=JSONTypeClass(), default=None, multiple=False +) @click.option( "--tolerations", default=None, @@ -156,6 +159,7 @@ def step( tmpfs_path=None, run_time_limit=None, persistent_volume_claims=None, + ephemeral_volume_claims=None, tolerations=None, shared_memory=None, port=None, @@ -297,6 +301,7 @@ def _sync_metadata(): run_time_limit=run_time_limit, env=env, persistent_volume_claims=persistent_volume_claims, + ephemeral_volume_claims=ephemeral_volume_claims, tolerations=tolerations, shared_memory=shared_memory, port=port, diff --git a/metaflow/plugins/kubernetes/kubernetes_decorator.py b/metaflow/plugins/kubernetes/kubernetes_decorator.py index 53f08daf051..42ca005623f 100644 --- a/metaflow/plugins/kubernetes/kubernetes_decorator.py +++ b/metaflow/plugins/kubernetes/kubernetes_decorator.py @@ -102,6 +102,9 @@ class KubernetesDecorator(StepDecorator): persistent_volume_claims : Dict[str, str], optional, default None A map (dictionary) of persistent volumes to be mounted to the pod for this step. The map is from persistent volumes to the path to which the volume is to be mounted, e.g., `{'pvc-name': '/path/to/mount/on'}`. + ephemeral_volume_claims: Dict[str, Any], optional, default None + A map (dictionary) of ephemeral volumes to be mounted to the pod for this step. The map is a name + to a dictionary containing the key 'path' (required), 'metadata' (optional), and 'spec' (optional). shared_memory: int, optional Shared memory size (in MiB) required for this step port: int, optional @@ -136,6 +139,7 @@ class KubernetesDecorator(StepDecorator): "tmpfs_size": None, "tmpfs_path": "/metaflow_temp", "persistent_volume_claims": None, # e.g., {"pvc-name": "/mnt/vol", "another-pvc": "/mnt/vol2"} + "ephemeral_volume_claims": None, # e.g., {"ephemeral-name": {"path": "/mnt/vol", "spec": {"storage_class_name": "my_storage_class"}}} "shared_memory": None, "port": None, "compute_pool": None, @@ -171,6 +175,13 @@ def init(self): self.attributes["persistent_volume_claims"] = json.loads( KUBERNETES_PERSISTENT_VOLUME_CLAIMS ) + if ( + not self.attributes["ephemeral_volume_claims"] + and KUBERNETES_EPHEMERAL_VOLUME_CLAIMS + ): + self.attributes["ephemeral_volume_claims"] = json.loads( + KUBERNETES_EPHEMERAL_VOLUME_CLAIMS + ) if not self.attributes["image_pull_policy"] and KUBERNETES_IMAGE_PULL_POLICY: self.attributes["image_pull_policy"] = KUBERNETES_IMAGE_PULL_POLICY @@ -426,7 +437,7 @@ def runtime_step_cli( "=".join([key, str(val)]) if val else key for key, val in v.items() ] - elif k in ["tolerations", "persistent_volume_claims"]: + elif k in ["tolerations", "persistent_volume_claims", "ephemeral_volume_claims"]: cli_args.command_options[k] = json.dumps(v) else: cli_args.command_options[k] = v diff --git a/metaflow/plugins/kubernetes/kubernetes_job.py b/metaflow/plugins/kubernetes/kubernetes_job.py index 1728cdfd674..d5a003691fe 100644 --- a/metaflow/plugins/kubernetes/kubernetes_job.py +++ b/metaflow/plugins/kubernetes/kubernetes_job.py @@ -205,6 +205,19 @@ def create_job_spec(self): ] if self._kwargs["persistent_volume_claims"] is not None else [] + ) + + ( + [ + client.V1VolumeMount( + mount_path=vals.path, name=name + ) + for name, vals in self._kwargs[ + "ephemeral_volume_claims" + ].items() + ] + if self._kwargs["ephemeral_volume_claims"] + is not None + else [] ), ) ], @@ -266,6 +279,25 @@ def create_job_spec(self): ] if self._kwargs["persistent_volume_claims"] is not None else [] + ) + + ( + [ + client.V1Volume( + name=name, + ephemeral=client.V1EphemeralVolumeSource( + volume_claim_template=client.V1PersistentVolumeClaimTemplate( + metadata=vals["metadata"] if "metadata" in vals else None, + spec=vals.spec, + ) + ), + ) + for name, vals in self._kwargs[ + "ephemeral_volume_claims" + ].items() + ] + if self._kwargs["ephemeral_volume_claims"] + is not None + else [] ), ), ), diff --git a/metaflow/plugins/kubernetes/kubernetes_jobsets.py b/metaflow/plugins/kubernetes/kubernetes_jobsets.py index e7236aea746..593172ba294 100644 --- a/metaflow/plugins/kubernetes/kubernetes_jobsets.py +++ b/metaflow/plugins/kubernetes/kubernetes_jobsets.py @@ -707,6 +707,19 @@ def dump(self): if self._kwargs["persistent_volume_claims"] is not None else [] + ) + + ( + [ + client.V1VolumeMount( + mount_path=vals.path, name=name + ) + for name, vals in self._kwargs[ + "ephemeral_volume_claims" + ].items() + ] + if self._kwargs["ephemeral_volume_claims"] + is not None + else [] ), ) ], @@ -772,6 +785,25 @@ def dump(self): if self._kwargs["persistent_volume_claims"] is not None else [] + ) + + ( + [ + client.V1Volume( + name=name, + ephemeral=client.V1EphemeralVolumeSource( + volume_claim_template=client.V1PersistentVolumeClaimTemplate( + metadata=vals["metadata"] if "metadata" in vals else None, + spec=vals.spec, + ) + ), + ) + for name, vals in self._kwargs[ + "ephemeral_volume_claims" + ].items() + ] + if self._kwargs["ephemeral_volume_claims"] + is not None + else [] ), ), ), From cb28a8ce719a4f4c723eca67e018f79d43711ae8 Mon Sep 17 00:00:00 2001 From: Tyler Rhodes <767526+trhodeos@users.noreply.github.com> Date: Thu, 17 Oct 2024 10:48:08 -0500 Subject: [PATCH 2/9] argo workflows --- metaflow/plugins/argo/argo_workflows.py | 36 +++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/metaflow/plugins/argo/argo_workflows.py b/metaflow/plugins/argo/argo_workflows.py index ea0d6c6798e..3b1ddc7c542 100644 --- a/metaflow/plugins/argo/argo_workflows.py +++ b/metaflow/plugins/argo/argo_workflows.py @@ -1951,6 +1951,7 @@ def _container_templates(self): tmpfs_path=tmpfs_path, timeout_in_seconds=run_time_limit, persistent_volume_claims=resources["persistent_volume_claims"], + ephemeral_volume_claims=resources["ephemeral_volume_claims"], shared_memory=shared_memory, port=port, qos=resources["qos"], @@ -2084,6 +2085,7 @@ def _container_templates(self): ) .empty_dir_volume("dhsm", medium="Memory", size_limit=shared_memory) .pvc_volumes(resources.get("persistent_volume_claims")) + .ephemeral_volume_claims(resources.get("ephemeral_volume_claims")) # Set node selectors .node_selectors(resources.get("node_selector")) # Set tolerations @@ -2214,6 +2216,20 @@ def _container_templates(self): if resources.get("persistent_volume_claims") is not None else [] + ) + # Support ephemeral volume claims. + + ( + [ + kubernetes_sdk.V1VolumeMount( + name=claim, mount_path=values.path + ) + for claim, values in resources.get( + "ephemeral_volume_claims" + ).items() + ] + if resources.get("ephemeral_volume_claims") + is not None + else [] ), ).to_dict() ) @@ -3500,6 +3516,26 @@ def pvc_volumes(self, pvcs=None): ) return self + def ephemeral_volume_claims(self, claims=None): + """ + Create and attach Ephemeral Volume Claims as volumes. + + Parameters: + ----------- + claims: Optional[Dict] + a dictionary of ephemeral volumes name's to the paths they should be mounted to. e.g. + {"claim-1": {"path": "/mnt/path1", "spec": {"storageClassName": "my-claim"}, "metadata": {"labels": ["abc123"]}}} + """ + if claims is None: + return self + if "volumes" not in self.payload: + self.payload["volumes"] = [] + for name, values in claims.items(): + self.payload["volumes"].append( + {"name": name, "ephemeral": {"metadata": values["metadata"], "spec": values["spec"]}} + ) + return self + def node_selectors(self, node_selectors): if "nodeSelector" not in self.payload: self.payload["nodeSelector"] = {} From edb9e9b0126b690f4d181179c7d0bf291312054b Mon Sep 17 00:00:00 2001 From: Tyler Rhodes <767526+trhodeos@users.noreply.github.com> Date: Thu, 17 Oct 2024 14:45:49 -0500 Subject: [PATCH 3/9] fix --- metaflow/plugins/kubernetes/constants.py | 8 ++++++++ metaflow/plugins/kubernetes/kubernetes_job.py | 10 ++++++---- metaflow/plugins/kubernetes/kubernetes_jobsets.py | 7 +++++-- 3 files changed, 19 insertions(+), 6 deletions(-) create mode 100644 metaflow/plugins/kubernetes/constants.py diff --git a/metaflow/plugins/kubernetes/constants.py b/metaflow/plugins/kubernetes/constants.py new file mode 100644 index 00000000000..221020d45e0 --- /dev/null +++ b/metaflow/plugins/kubernetes/constants.py @@ -0,0 +1,8 @@ +_VOLUME_CLAIM_TEMPLATE_DEFAULTS = { + "accessModes": [ "ReadWriteOnce" ], + "resources": { + "requests": { + "storage": "1Gi" + } + } +} diff --git a/metaflow/plugins/kubernetes/kubernetes_job.py b/metaflow/plugins/kubernetes/kubernetes_job.py index d5a003691fe..a802a0c4fe9 100644 --- a/metaflow/plugins/kubernetes/kubernetes_job.py +++ b/metaflow/plugins/kubernetes/kubernetes_job.py @@ -16,7 +16,9 @@ ) # We need this import for Kubernetes Client. from .kube_utils import qos_requests_and_limits - +from .constants import ( + _VOLUME_CLAIM_TEMPLATE_DEFAULTS +) class KubernetesJobException(MetaflowException): headline = "Kubernetes job error" @@ -209,7 +211,7 @@ def create_job_spec(self): + ( [ client.V1VolumeMount( - mount_path=vals.path, name=name + mount_path=vals["path"], name=name ) for name, vals in self._kwargs[ "ephemeral_volume_claims" @@ -286,8 +288,8 @@ def create_job_spec(self): name=name, ephemeral=client.V1EphemeralVolumeSource( volume_claim_template=client.V1PersistentVolumeClaimTemplate( - metadata=vals["metadata"] if "metadata" in vals else None, - spec=vals.spec, + metadata=vals.get("metadata", {}), + spec={**vals.get("spec", {}), **_VOLUME_CLAIM_TEMPLATE_DEFAULTS}, ) ), ) diff --git a/metaflow/plugins/kubernetes/kubernetes_jobsets.py b/metaflow/plugins/kubernetes/kubernetes_jobsets.py index 593172ba294..5f4a08157a7 100644 --- a/metaflow/plugins/kubernetes/kubernetes_jobsets.py +++ b/metaflow/plugins/kubernetes/kubernetes_jobsets.py @@ -10,6 +10,9 @@ from metaflow.metaflow_config import KUBERNETES_SECRETS from .kube_utils import qos_requests_and_limits +from .constants import ( + _VOLUME_CLAIM_TEMPLATE_DEFAULTS +) class KubernetesJobsetException(MetaflowException): @@ -792,8 +795,8 @@ def dump(self): name=name, ephemeral=client.V1EphemeralVolumeSource( volume_claim_template=client.V1PersistentVolumeClaimTemplate( - metadata=vals["metadata"] if "metadata" in vals else None, - spec=vals.spec, + metadata=vals.get("metadata", {}), + spec={**vals.get("spec", {}), **_VOLUME_CLAIM_TEMPLATE_DEFAULTS}, ) ), ) From e3d7553a2b35d67ac2685a1ecb04d9740695eabc Mon Sep 17 00:00:00 2001 From: Tyler Rhodes <767526+trhodeos@users.noreply.github.com> Date: Thu, 17 Oct 2024 15:34:45 -0500 Subject: [PATCH 4/9] fixes --- metaflow/plugins/airflow/airflow.py | 1 + metaflow/plugins/argo/argo_workflows.py | 7 ++++--- metaflow/plugins/kubernetes/constants.py | 3 ++- metaflow/plugins/kubernetes/kubernetes.py | 9 +++++++++ metaflow/plugins/kubernetes/kubernetes_decorator.py | 1 + metaflow/plugins/kubernetes/kubernetes_job.py | 4 ++-- metaflow/plugins/kubernetes/kubernetes_jobsets.py | 8 ++++---- 7 files changed, 23 insertions(+), 10 deletions(-) diff --git a/metaflow/plugins/airflow/airflow.py b/metaflow/plugins/airflow/airflow.py index 304fa9f3bd9..6724a56fa5b 100644 --- a/metaflow/plugins/airflow/airflow.py +++ b/metaflow/plugins/airflow/airflow.py @@ -667,6 +667,7 @@ def _visit(node, workflow, exit_node=None): "use_tmpfs", "tmpfs_size", "persistent_volume_claims", + "ephemeral_volume_claims", "image_pull_policy", ]: if kube_deco[attr]: diff --git a/metaflow/plugins/argo/argo_workflows.py b/metaflow/plugins/argo/argo_workflows.py index 3b1ddc7c542..efa06cc71eb 100644 --- a/metaflow/plugins/argo/argo_workflows.py +++ b/metaflow/plugins/argo/argo_workflows.py @@ -58,6 +58,7 @@ parse_kube_keyvalue_list, validate_kube_labels, ) +from metaflow.plugins.kubernetes.constants import VOLUME_CLAIM_TEMPLATE_DEFAULTS from metaflow.plugins.kubernetes.kubernetes_jobsets import KubernetesArgoJobSet from metaflow.unbounded_foreach import UBF_CONTROL, UBF_TASK from metaflow.user_configs.config_options import ConfigInput @@ -2221,9 +2222,9 @@ def _container_templates(self): + ( [ kubernetes_sdk.V1VolumeMount( - name=claim, mount_path=values.path + name=name, mount_path=values["path"] ) - for claim, values in resources.get( + for name, values in resources.get( "ephemeral_volume_claims" ).items() ] @@ -3532,7 +3533,7 @@ def ephemeral_volume_claims(self, claims=None): self.payload["volumes"] = [] for name, values in claims.items(): self.payload["volumes"].append( - {"name": name, "ephemeral": {"metadata": values["metadata"], "spec": values["spec"]}} + {"name": name, "ephemeral": {"volumeClaimTemplate": {"metadata": values.get("metadata", None), "spec": {**values.get("spec", {"abc": "def"}), **VOLUME_CLAIM_TEMPLATE_DEFAULTS}}}} ) return self diff --git a/metaflow/plugins/kubernetes/constants.py b/metaflow/plugins/kubernetes/constants.py index 221020d45e0..b8ce6b49e07 100644 --- a/metaflow/plugins/kubernetes/constants.py +++ b/metaflow/plugins/kubernetes/constants.py @@ -1,4 +1,4 @@ -_VOLUME_CLAIM_TEMPLATE_DEFAULTS = { +VOLUME_CLAIM_TEMPLATE_DEFAULTS = { "accessModes": [ "ReadWriteOnce" ], "resources": { "requests": { @@ -6,3 +6,4 @@ } } } + diff --git a/metaflow/plugins/kubernetes/kubernetes.py b/metaflow/plugins/kubernetes/kubernetes.py index a43fa41f037..d228b9dce13 100644 --- a/metaflow/plugins/kubernetes/kubernetes.py +++ b/metaflow/plugins/kubernetes/kubernetes.py @@ -67,6 +67,15 @@ "{METAFLOW_PARALLEL_STEP_CLI_OPTIONS_TEMPLATE}" ) +VOLUME_CLAIM_TEMPLATE_DEFAULTS = { + "accessModes": [ "ReadWriteOnce" ], + "resources": { + "requests": { + "storage": "1Gi" + } + } +} + class KubernetesException(MetaflowException): headline = "Kubernetes error" diff --git a/metaflow/plugins/kubernetes/kubernetes_decorator.py b/metaflow/plugins/kubernetes/kubernetes_decorator.py index 42ca005623f..4f72d2f220c 100644 --- a/metaflow/plugins/kubernetes/kubernetes_decorator.py +++ b/metaflow/plugins/kubernetes/kubernetes_decorator.py @@ -22,6 +22,7 @@ KUBERNETES_NAMESPACE, KUBERNETES_NODE_SELECTOR, KUBERNETES_PERSISTENT_VOLUME_CLAIMS, + KUBERNETES_EPHEMERAL_VOLUME_CLAIMS, KUBERNETES_PORT, KUBERNETES_SERVICE_ACCOUNT, KUBERNETES_SHARED_MEMORY, diff --git a/metaflow/plugins/kubernetes/kubernetes_job.py b/metaflow/plugins/kubernetes/kubernetes_job.py index a802a0c4fe9..f8974e27507 100644 --- a/metaflow/plugins/kubernetes/kubernetes_job.py +++ b/metaflow/plugins/kubernetes/kubernetes_job.py @@ -17,7 +17,7 @@ from .kube_utils import qos_requests_and_limits from .constants import ( - _VOLUME_CLAIM_TEMPLATE_DEFAULTS + VOLUME_CLAIM_TEMPLATE_DEFAULTS ) class KubernetesJobException(MetaflowException): @@ -289,7 +289,7 @@ def create_job_spec(self): ephemeral=client.V1EphemeralVolumeSource( volume_claim_template=client.V1PersistentVolumeClaimTemplate( metadata=vals.get("metadata", {}), - spec={**vals.get("spec", {}), **_VOLUME_CLAIM_TEMPLATE_DEFAULTS}, + spec={**vals.get("spec", {}), **VOLUME_CLAIM_TEMPLATE_DEFAULTS}, ) ), ) diff --git a/metaflow/plugins/kubernetes/kubernetes_jobsets.py b/metaflow/plugins/kubernetes/kubernetes_jobsets.py index 5f4a08157a7..f960f4331c7 100644 --- a/metaflow/plugins/kubernetes/kubernetes_jobsets.py +++ b/metaflow/plugins/kubernetes/kubernetes_jobsets.py @@ -11,7 +11,7 @@ from .kube_utils import qos_requests_and_limits from .constants import ( - _VOLUME_CLAIM_TEMPLATE_DEFAULTS + VOLUME_CLAIM_TEMPLATE_DEFAULTS ) @@ -714,7 +714,7 @@ def dump(self): + ( [ client.V1VolumeMount( - mount_path=vals.path, name=name + mount_path=vals["path"], name=name ) for name, vals in self._kwargs[ "ephemeral_volume_claims" @@ -795,8 +795,8 @@ def dump(self): name=name, ephemeral=client.V1EphemeralVolumeSource( volume_claim_template=client.V1PersistentVolumeClaimTemplate( - metadata=vals.get("metadata", {}), - spec={**vals.get("spec", {}), **_VOLUME_CLAIM_TEMPLATE_DEFAULTS}, + metadata=vals.get("metadata", None), + spec={**vals.get("spec", {}), **VOLUME_CLAIM_TEMPLATE_DEFAULTS}, ) ), ) From b293bb2c2440968d76b8680769b006df0e732461 Mon Sep 17 00:00:00 2001 From: Tyler Rhodes <767526+trhodeos@users.noreply.github.com> Date: Thu, 17 Oct 2024 15:40:41 -0500 Subject: [PATCH 5/9] fmt --- metaflow/metaflow_config.py | 4 +--- metaflow/plugins/argo/argo_workflows.py | 13 +++++++++++- metaflow/plugins/kubernetes/constants.py | 9 ++------ metaflow/plugins/kubernetes/kubernetes.py | 8 ++----- .../kubernetes/kubernetes_decorator.py | 6 +++++- metaflow/plugins/kubernetes/kubernetes_job.py | 21 +++++++++---------- .../plugins/kubernetes/kubernetes_jobsets.py | 15 ++++++------- 7 files changed, 40 insertions(+), 36 deletions(-) diff --git a/metaflow/metaflow_config.py b/metaflow/metaflow_config.py index 0055e940c60..6b924b15f39 100644 --- a/metaflow/metaflow_config.py +++ b/metaflow/metaflow_config.py @@ -352,9 +352,7 @@ KUBERNETES_PERSISTENT_VOLUME_CLAIMS = from_conf( "KUBERNETES_PERSISTENT_VOLUME_CLAIMS", "" ) -KUBERNETES_EPHEMERAL_VOLUME_CLAIMS = from_conf( - "KUBERNETES_EPHEMERAL_VOLUME_CLAIMS", "" -) +KUBERNETES_EPHEMERAL_VOLUME_CLAIMS = from_conf("KUBERNETES_EPHEMERAL_VOLUME_CLAIMS", "") KUBERNETES_SECRETS = from_conf("KUBERNETES_SECRETS", "") # Default labels for kubernetes pods KUBERNETES_LABELS = from_conf("KUBERNETES_LABELS", "") diff --git a/metaflow/plugins/argo/argo_workflows.py b/metaflow/plugins/argo/argo_workflows.py index efa06cc71eb..a0060a11a47 100644 --- a/metaflow/plugins/argo/argo_workflows.py +++ b/metaflow/plugins/argo/argo_workflows.py @@ -3533,7 +3533,18 @@ def ephemeral_volume_claims(self, claims=None): self.payload["volumes"] = [] for name, values in claims.items(): self.payload["volumes"].append( - {"name": name, "ephemeral": {"volumeClaimTemplate": {"metadata": values.get("metadata", None), "spec": {**values.get("spec", {"abc": "def"}), **VOLUME_CLAIM_TEMPLATE_DEFAULTS}}}} + { + "name": name, + "ephemeral": { + "volumeClaimTemplate": { + "metadata": values.get("metadata", None), + "spec": { + **values.get("spec", {"abc": "def"}), + **VOLUME_CLAIM_TEMPLATE_DEFAULTS, + }, + } + }, + } ) return self diff --git a/metaflow/plugins/kubernetes/constants.py b/metaflow/plugins/kubernetes/constants.py index b8ce6b49e07..b23f05f9e3a 100644 --- a/metaflow/plugins/kubernetes/constants.py +++ b/metaflow/plugins/kubernetes/constants.py @@ -1,9 +1,4 @@ VOLUME_CLAIM_TEMPLATE_DEFAULTS = { - "accessModes": [ "ReadWriteOnce" ], - "resources": { - "requests": { - "storage": "1Gi" - } - } + "accessModes": ["ReadWriteOnce"], + "resources": {"requests": {"storage": "1Gi"}}, } - diff --git a/metaflow/plugins/kubernetes/kubernetes.py b/metaflow/plugins/kubernetes/kubernetes.py index d228b9dce13..ab36992f9a1 100644 --- a/metaflow/plugins/kubernetes/kubernetes.py +++ b/metaflow/plugins/kubernetes/kubernetes.py @@ -68,12 +68,8 @@ ) VOLUME_CLAIM_TEMPLATE_DEFAULTS = { - "accessModes": [ "ReadWriteOnce" ], - "resources": { - "requests": { - "storage": "1Gi" - } - } + "accessModes": ["ReadWriteOnce"], + "resources": {"requests": {"storage": "1Gi"}}, } diff --git a/metaflow/plugins/kubernetes/kubernetes_decorator.py b/metaflow/plugins/kubernetes/kubernetes_decorator.py index 4f72d2f220c..9d68806dd44 100644 --- a/metaflow/plugins/kubernetes/kubernetes_decorator.py +++ b/metaflow/plugins/kubernetes/kubernetes_decorator.py @@ -438,7 +438,11 @@ def runtime_step_cli( "=".join([key, str(val)]) if val else key for key, val in v.items() ] - elif k in ["tolerations", "persistent_volume_claims", "ephemeral_volume_claims"]: + elif k in [ + "tolerations", + "persistent_volume_claims", + "ephemeral_volume_claims", + ]: cli_args.command_options[k] = json.dumps(v) else: cli_args.command_options[k] = v diff --git a/metaflow/plugins/kubernetes/kubernetes_job.py b/metaflow/plugins/kubernetes/kubernetes_job.py index f8974e27507..b2a5c443db1 100644 --- a/metaflow/plugins/kubernetes/kubernetes_job.py +++ b/metaflow/plugins/kubernetes/kubernetes_job.py @@ -16,9 +16,7 @@ ) # We need this import for Kubernetes Client. from .kube_utils import qos_requests_and_limits -from .constants import ( - VOLUME_CLAIM_TEMPLATE_DEFAULTS -) +from .constants import VOLUME_CLAIM_TEMPLATE_DEFAULTS class KubernetesJobException(MetaflowException): headline = "Kubernetes job error" @@ -217,8 +215,7 @@ def create_job_spec(self): "ephemeral_volume_claims" ].items() ] - if self._kwargs["ephemeral_volume_claims"] - is not None + if self._kwargs["ephemeral_volume_claims"] is not None else [] ), ) @@ -287,18 +284,20 @@ def create_job_spec(self): client.V1Volume( name=name, ephemeral=client.V1EphemeralVolumeSource( - volume_claim_template=client.V1PersistentVolumeClaimTemplate( - metadata=vals.get("metadata", {}), - spec={**vals.get("spec", {}), **VOLUME_CLAIM_TEMPLATE_DEFAULTS}, - ) + volume_claim_template=client.V1PersistentVolumeClaimTemplate( + metadata=vals.get("metadata", {}), + spec={ + **vals.get("spec", {}), + **VOLUME_CLAIM_TEMPLATE_DEFAULTS, + }, + ) ), ) for name, vals in self._kwargs[ "ephemeral_volume_claims" ].items() ] - if self._kwargs["ephemeral_volume_claims"] - is not None + if self._kwargs["ephemeral_volume_claims"] is not None else [] ), ), diff --git a/metaflow/plugins/kubernetes/kubernetes_jobsets.py b/metaflow/plugins/kubernetes/kubernetes_jobsets.py index f960f4331c7..f444dfcd9f3 100644 --- a/metaflow/plugins/kubernetes/kubernetes_jobsets.py +++ b/metaflow/plugins/kubernetes/kubernetes_jobsets.py @@ -10,9 +10,7 @@ from metaflow.metaflow_config import KUBERNETES_SECRETS from .kube_utils import qos_requests_and_limits -from .constants import ( - VOLUME_CLAIM_TEMPLATE_DEFAULTS -) +from .constants import VOLUME_CLAIM_TEMPLATE_DEFAULTS class KubernetesJobsetException(MetaflowException): @@ -794,10 +792,13 @@ def dump(self): client.V1Volume( name=name, ephemeral=client.V1EphemeralVolumeSource( - volume_claim_template=client.V1PersistentVolumeClaimTemplate( - metadata=vals.get("metadata", None), - spec={**vals.get("spec", {}), **VOLUME_CLAIM_TEMPLATE_DEFAULTS}, - ) + volume_claim_template=client.V1PersistentVolumeClaimTemplate( + metadata=vals.get("metadata", None), + spec={ + **vals.get("spec", {}), + **VOLUME_CLAIM_TEMPLATE_DEFAULTS, + }, + ) ), ) for name, vals in self._kwargs[ From 8225344e0f3b69a498ff49f4a927340d8d83231b Mon Sep 17 00:00:00 2001 From: Tyler Rhodes <767526+trhodeos@users.noreply.github.com> Date: Thu, 17 Oct 2024 16:00:34 -0500 Subject: [PATCH 6/9] fix --- metaflow/plugins/argo/argo_workflows.py | 3 +-- metaflow/plugins/kubernetes/kubernetes_decorator.py | 2 +- metaflow/plugins/kubernetes/kubernetes_job.py | 1 - metaflow/plugins/kubernetes/kubernetes_jobsets.py | 1 - 4 files changed, 2 insertions(+), 5 deletions(-) diff --git a/metaflow/plugins/argo/argo_workflows.py b/metaflow/plugins/argo/argo_workflows.py index a0060a11a47..9223ea7104d 100644 --- a/metaflow/plugins/argo/argo_workflows.py +++ b/metaflow/plugins/argo/argo_workflows.py @@ -3537,9 +3537,8 @@ def ephemeral_volume_claims(self, claims=None): "name": name, "ephemeral": { "volumeClaimTemplate": { - "metadata": values.get("metadata", None), "spec": { - **values.get("spec", {"abc": "def"}), + **values.get("spec", {}), **VOLUME_CLAIM_TEMPLATE_DEFAULTS, }, } diff --git a/metaflow/plugins/kubernetes/kubernetes_decorator.py b/metaflow/plugins/kubernetes/kubernetes_decorator.py index 9d68806dd44..54970bbdc27 100644 --- a/metaflow/plugins/kubernetes/kubernetes_decorator.py +++ b/metaflow/plugins/kubernetes/kubernetes_decorator.py @@ -140,7 +140,7 @@ class KubernetesDecorator(StepDecorator): "tmpfs_size": None, "tmpfs_path": "/metaflow_temp", "persistent_volume_claims": None, # e.g., {"pvc-name": "/mnt/vol", "another-pvc": "/mnt/vol2"} - "ephemeral_volume_claims": None, # e.g., {"ephemeral-name": {"path": "/mnt/vol", "spec": {"storage_class_name": "my_storage_class"}}} + "ephemeral_volume_claims": None, # e.g., {"ephemeral-name": {"path": "/mnt/vol", "spec": {"storageClassName": "my_storage_class"}}} "shared_memory": None, "port": None, "compute_pool": None, diff --git a/metaflow/plugins/kubernetes/kubernetes_job.py b/metaflow/plugins/kubernetes/kubernetes_job.py index b2a5c443db1..9a5743cbf3e 100644 --- a/metaflow/plugins/kubernetes/kubernetes_job.py +++ b/metaflow/plugins/kubernetes/kubernetes_job.py @@ -285,7 +285,6 @@ def create_job_spec(self): name=name, ephemeral=client.V1EphemeralVolumeSource( volume_claim_template=client.V1PersistentVolumeClaimTemplate( - metadata=vals.get("metadata", {}), spec={ **vals.get("spec", {}), **VOLUME_CLAIM_TEMPLATE_DEFAULTS, diff --git a/metaflow/plugins/kubernetes/kubernetes_jobsets.py b/metaflow/plugins/kubernetes/kubernetes_jobsets.py index f444dfcd9f3..c141c17ea24 100644 --- a/metaflow/plugins/kubernetes/kubernetes_jobsets.py +++ b/metaflow/plugins/kubernetes/kubernetes_jobsets.py @@ -793,7 +793,6 @@ def dump(self): name=name, ephemeral=client.V1EphemeralVolumeSource( volume_claim_template=client.V1PersistentVolumeClaimTemplate( - metadata=vals.get("metadata", None), spec={ **vals.get("spec", {}), **VOLUME_CLAIM_TEMPLATE_DEFAULTS, From c3b7b6a5d93c55bbb6b0d26e0ce5604313760537 Mon Sep 17 00:00:00 2001 From: Tyler Rhodes <767526+trhodeos@users.noreply.github.com> Date: Mon, 4 Nov 2024 15:55:34 -0600 Subject: [PATCH 7/9] fix --- metaflow/plugins/argo/argo_workflows.py | 2 +- metaflow/plugins/kubernetes/kubernetes.py | 5 ----- metaflow/plugins/kubernetes/kubernetes_job.py | 2 +- metaflow/plugins/kubernetes/kubernetes_jobsets.py | 2 +- 4 files changed, 3 insertions(+), 8 deletions(-) diff --git a/metaflow/plugins/argo/argo_workflows.py b/metaflow/plugins/argo/argo_workflows.py index 9223ea7104d..ecdf2b5a365 100644 --- a/metaflow/plugins/argo/argo_workflows.py +++ b/metaflow/plugins/argo/argo_workflows.py @@ -3538,8 +3538,8 @@ def ephemeral_volume_claims(self, claims=None): "ephemeral": { "volumeClaimTemplate": { "spec": { - **values.get("spec", {}), **VOLUME_CLAIM_TEMPLATE_DEFAULTS, + **values.get("spec", {}), }, } }, diff --git a/metaflow/plugins/kubernetes/kubernetes.py b/metaflow/plugins/kubernetes/kubernetes.py index ab36992f9a1..a43fa41f037 100644 --- a/metaflow/plugins/kubernetes/kubernetes.py +++ b/metaflow/plugins/kubernetes/kubernetes.py @@ -67,11 +67,6 @@ "{METAFLOW_PARALLEL_STEP_CLI_OPTIONS_TEMPLATE}" ) -VOLUME_CLAIM_TEMPLATE_DEFAULTS = { - "accessModes": ["ReadWriteOnce"], - "resources": {"requests": {"storage": "1Gi"}}, -} - class KubernetesException(MetaflowException): headline = "Kubernetes error" diff --git a/metaflow/plugins/kubernetes/kubernetes_job.py b/metaflow/plugins/kubernetes/kubernetes_job.py index 9a5743cbf3e..b639ff8f3ee 100644 --- a/metaflow/plugins/kubernetes/kubernetes_job.py +++ b/metaflow/plugins/kubernetes/kubernetes_job.py @@ -286,8 +286,8 @@ def create_job_spec(self): ephemeral=client.V1EphemeralVolumeSource( volume_claim_template=client.V1PersistentVolumeClaimTemplate( spec={ - **vals.get("spec", {}), **VOLUME_CLAIM_TEMPLATE_DEFAULTS, + **vals.get("spec", {}), }, ) ), diff --git a/metaflow/plugins/kubernetes/kubernetes_jobsets.py b/metaflow/plugins/kubernetes/kubernetes_jobsets.py index c141c17ea24..0bf68209537 100644 --- a/metaflow/plugins/kubernetes/kubernetes_jobsets.py +++ b/metaflow/plugins/kubernetes/kubernetes_jobsets.py @@ -794,8 +794,8 @@ def dump(self): ephemeral=client.V1EphemeralVolumeSource( volume_claim_template=client.V1PersistentVolumeClaimTemplate( spec={ - **vals.get("spec", {}), **VOLUME_CLAIM_TEMPLATE_DEFAULTS, + **vals.get("spec", {}), }, ) ), From dcfe5fff3ac3f92928fa943013002907cc2b1e12 Mon Sep 17 00:00:00 2001 From: Tyler Rhodes <767526+trhodeos@users.noreply.github.com> Date: Tue, 10 Dec 2024 09:53:27 -0600 Subject: [PATCH 8/9] fix --- metaflow/plugins/argo/argo_workflows.py | 2 +- metaflow/plugins/kubernetes/constants.py | 4 ---- metaflow/plugins/kubernetes/kube_utils.py | 5 +++++ metaflow/plugins/kubernetes/kubernetes_job.py | 3 +-- metaflow/plugins/kubernetes/kubernetes_jobsets.py | 3 +-- 5 files changed, 8 insertions(+), 9 deletions(-) diff --git a/metaflow/plugins/argo/argo_workflows.py b/metaflow/plugins/argo/argo_workflows.py index ecdf2b5a365..0e2e049c0ad 100644 --- a/metaflow/plugins/argo/argo_workflows.py +++ b/metaflow/plugins/argo/argo_workflows.py @@ -58,7 +58,7 @@ parse_kube_keyvalue_list, validate_kube_labels, ) -from metaflow.plugins.kubernetes.constants import VOLUME_CLAIM_TEMPLATE_DEFAULTS +from metaflow.plugins.kubernetes.kube_utils import VOLUME_CLAIM_TEMPLATE_DEFAULTS from metaflow.plugins.kubernetes.kubernetes_jobsets import KubernetesArgoJobSet from metaflow.unbounded_foreach import UBF_CONTROL, UBF_TASK from metaflow.user_configs.config_options import ConfigInput diff --git a/metaflow/plugins/kubernetes/constants.py b/metaflow/plugins/kubernetes/constants.py index b23f05f9e3a..e69de29bb2d 100644 --- a/metaflow/plugins/kubernetes/constants.py +++ b/metaflow/plugins/kubernetes/constants.py @@ -1,4 +0,0 @@ -VOLUME_CLAIM_TEMPLATE_DEFAULTS = { - "accessModes": ["ReadWriteOnce"], - "resources": {"requests": {"storage": "1Gi"}}, -} diff --git a/metaflow/plugins/kubernetes/kube_utils.py b/metaflow/plugins/kubernetes/kube_utils.py index e19207df2ae..03df92bdee5 100644 --- a/metaflow/plugins/kubernetes/kube_utils.py +++ b/metaflow/plugins/kubernetes/kube_utils.py @@ -2,6 +2,11 @@ from metaflow.util import get_username, get_latest_run_id +VOLUME_CLAIM_TEMPLATE_DEFAULTS = { + "accessModes": ["ReadWriteOnce"], + "resources": {"requests": {"storage": "1Gi"}}, +} + def parse_cli_options(flow_name, run_id, user, my_runs, echo): if user and my_runs: raise CommandException("--user and --my-runs are mutually exclusive.") diff --git a/metaflow/plugins/kubernetes/kubernetes_job.py b/metaflow/plugins/kubernetes/kubernetes_job.py index b639ff8f3ee..d8c717dfc88 100644 --- a/metaflow/plugins/kubernetes/kubernetes_job.py +++ b/metaflow/plugins/kubernetes/kubernetes_job.py @@ -15,8 +15,7 @@ KubernetesJobSet, ) # We need this import for Kubernetes Client. -from .kube_utils import qos_requests_and_limits -from .constants import VOLUME_CLAIM_TEMPLATE_DEFAULTS +from .kube_utils import qos_requests_and_limits, VOLUME_CLAIM_TEMPLATE_DEFAULTS class KubernetesJobException(MetaflowException): headline = "Kubernetes job error" diff --git a/metaflow/plugins/kubernetes/kubernetes_jobsets.py b/metaflow/plugins/kubernetes/kubernetes_jobsets.py index 0bf68209537..5519d65ea7d 100644 --- a/metaflow/plugins/kubernetes/kubernetes_jobsets.py +++ b/metaflow/plugins/kubernetes/kubernetes_jobsets.py @@ -9,8 +9,7 @@ from metaflow.tracing import inject_tracing_vars from metaflow.metaflow_config import KUBERNETES_SECRETS -from .kube_utils import qos_requests_and_limits -from .constants import VOLUME_CLAIM_TEMPLATE_DEFAULTS +from .kube_utils import qos_requests_and_limits, VOLUME_CLAIM_TEMPLATE_DEFAULTS class KubernetesJobsetException(MetaflowException): From 872b90f3208f3c236848388c6d15a3009295ec37 Mon Sep 17 00:00:00 2001 From: Tyler Rhodes <767526+trhodeos@users.noreply.github.com> Date: Tue, 10 Dec 2024 09:54:42 -0600 Subject: [PATCH 9/9] fmt --- metaflow/plugins/kubernetes/kube_utils.py | 1 + metaflow/plugins/kubernetes/kubernetes_job.py | 1 + 2 files changed, 2 insertions(+) diff --git a/metaflow/plugins/kubernetes/kube_utils.py b/metaflow/plugins/kubernetes/kube_utils.py index 03df92bdee5..0c554b195c5 100644 --- a/metaflow/plugins/kubernetes/kube_utils.py +++ b/metaflow/plugins/kubernetes/kube_utils.py @@ -7,6 +7,7 @@ "resources": {"requests": {"storage": "1Gi"}}, } + def parse_cli_options(flow_name, run_id, user, my_runs, echo): if user and my_runs: raise CommandException("--user and --my-runs are mutually exclusive.") diff --git a/metaflow/plugins/kubernetes/kubernetes_job.py b/metaflow/plugins/kubernetes/kubernetes_job.py index d8c717dfc88..c1b39d870e7 100644 --- a/metaflow/plugins/kubernetes/kubernetes_job.py +++ b/metaflow/plugins/kubernetes/kubernetes_job.py @@ -17,6 +17,7 @@ from .kube_utils import qos_requests_and_limits, VOLUME_CLAIM_TEMPLATE_DEFAULTS + class KubernetesJobException(MetaflowException): headline = "Kubernetes job error"