Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for ephemeral volume claims to kubernetes/argo #2103

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions metaflow/metaflow_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@
KUBERNETES_PERSISTENT_VOLUME_CLAIMS = from_conf(
"KUBERNETES_PERSISTENT_VOLUME_CLAIMS", ""
)
KUBERNETES_EPHEMERAL_VOLUME_CLAIMS = from_conf("KUBERNETES_EPHEMERAL_VOLUME_CLAIMS", "")
KUBERNETES_SECRETS = from_conf("KUBERNETES_SECRETS", "")
# Default labels for kubernetes pods
KUBERNETES_LABELS = from_conf("KUBERNETES_LABELS", "")
Expand Down
1 change: 1 addition & 0 deletions metaflow/plugins/airflow/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,7 @@ def _visit(node, workflow, exit_node=None):
"use_tmpfs",
"tmpfs_size",
"persistent_volume_claims",
"ephemeral_volume_claims",
"image_pull_policy",
]:
if kube_deco[attr]:
Expand Down
47 changes: 47 additions & 0 deletions metaflow/plugins/argo/argo_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
parse_kube_keyvalue_list,
validate_kube_labels,
)
from metaflow.plugins.kubernetes.kube_utils import VOLUME_CLAIM_TEMPLATE_DEFAULTS
from metaflow.plugins.kubernetes.kubernetes_jobsets import KubernetesArgoJobSet
from metaflow.unbounded_foreach import UBF_CONTROL, UBF_TASK
from metaflow.user_configs.config_options import ConfigInput
Expand Down Expand Up @@ -1951,6 +1952,7 @@ def _container_templates(self):
tmpfs_path=tmpfs_path,
timeout_in_seconds=run_time_limit,
persistent_volume_claims=resources["persistent_volume_claims"],
ephemeral_volume_claims=resources["ephemeral_volume_claims"],
shared_memory=shared_memory,
port=port,
qos=resources["qos"],
Expand Down Expand Up @@ -2084,6 +2086,7 @@ def _container_templates(self):
)
.empty_dir_volume("dhsm", medium="Memory", size_limit=shared_memory)
.pvc_volumes(resources.get("persistent_volume_claims"))
.ephemeral_volume_claims(resources.get("ephemeral_volume_claims"))
# Set node selectors
.node_selectors(resources.get("node_selector"))
# Set tolerations
Expand Down Expand Up @@ -2214,6 +2217,20 @@ def _container_templates(self):
if resources.get("persistent_volume_claims")
is not None
else []
)
# Support ephemeral volume claims.
+ (
[
kubernetes_sdk.V1VolumeMount(
name=name, mount_path=values["path"]
)
for name, values in resources.get(
"ephemeral_volume_claims"
).items()
]
if resources.get("ephemeral_volume_claims")
is not None
else []
),
).to_dict()
)
Expand Down Expand Up @@ -3500,6 +3517,36 @@ def pvc_volumes(self, pvcs=None):
)
return self

def ephemeral_volume_claims(self, claims=None):
trhodeos marked this conversation as resolved.
Show resolved Hide resolved
"""
Create and attach Ephemeral Volume Claims as volumes.

Parameters:
-----------
claims: Optional[Dict]
a dictionary of ephemeral volumes name's to the paths they should be mounted to. e.g.
{"claim-1": {"path": "/mnt/path1", "spec": {"storageClassName": "my-claim"}, "metadata": {"labels": ["abc123"]}}}
"""
if claims is None:
return self
if "volumes" not in self.payload:
self.payload["volumes"] = []
for name, values in claims.items():
self.payload["volumes"].append(
{
"name": name,
"ephemeral": {
"volumeClaimTemplate": {
"spec": {
**VOLUME_CLAIM_TEMPLATE_DEFAULTS,
trhodeos marked this conversation as resolved.
Show resolved Hide resolved
**values.get("spec", {}),
},
}
},
}
)
return self

def node_selectors(self, node_selectors):
if "nodeSelector" not in self.payload:
self.payload["nodeSelector"] = {}
Expand Down
Empty file.
6 changes: 6 additions & 0 deletions metaflow/plugins/kubernetes/kube_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
from metaflow.util import get_username, get_latest_run_id


VOLUME_CLAIM_TEMPLATE_DEFAULTS = {
"accessModes": ["ReadWriteOnce"],
"resources": {"requests": {"storage": "1Gi"}},
}


def parse_cli_options(flow_name, run_id, user, my_runs, echo):
if user and my_runs:
raise CommandException("--user and --my-runs are mutually exclusive.")
Expand Down
4 changes: 4 additions & 0 deletions metaflow/plugins/kubernetes/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ def create_jobset(
run_time_limit=None,
env=None,
persistent_volume_claims=None,
ephemeral_volume_claims=None,
tolerations=None,
labels=None,
shared_memory=None,
Expand Down Expand Up @@ -226,6 +227,7 @@ def create_jobset(
tmpfs_size=tmpfs_size,
tmpfs_path=tmpfs_path,
persistent_volume_claims=persistent_volume_claims,
ephemeral_volume_claims=ephemeral_volume_claims,
shared_memory=shared_memory,
port=port,
num_parallel=num_parallel,
Expand Down Expand Up @@ -485,6 +487,7 @@ def create_job_object(
run_time_limit=None,
env=None,
persistent_volume_claims=None,
ephemeral_volume_claims=None,
tolerations=None,
labels=None,
shared_memory=None,
Expand Down Expand Up @@ -529,6 +532,7 @@ def create_job_object(
tmpfs_size=tmpfs_size,
tmpfs_path=tmpfs_path,
persistent_volume_claims=persistent_volume_claims,
ephemeral_volume_claims=ephemeral_volume_claims,
shared_memory=shared_memory,
port=port,
qos=qos,
Expand Down
5 changes: 5 additions & 0 deletions metaflow/plugins/kubernetes/kubernetes_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ def kubernetes():
@click.option(
"--persistent-volume-claims", type=JSONTypeClass(), default=None, multiple=False
)
@click.option(
"--ephemeral-volume-claims", type=JSONTypeClass(), default=None, multiple=False
)
@click.option(
"--tolerations",
default=None,
Expand Down Expand Up @@ -156,6 +159,7 @@ def step(
tmpfs_path=None,
run_time_limit=None,
persistent_volume_claims=None,
ephemeral_volume_claims=None,
tolerations=None,
shared_memory=None,
port=None,
Expand Down Expand Up @@ -297,6 +301,7 @@ def _sync_metadata():
run_time_limit=run_time_limit,
env=env,
persistent_volume_claims=persistent_volume_claims,
ephemeral_volume_claims=ephemeral_volume_claims,
tolerations=tolerations,
shared_memory=shared_memory,
port=port,
Expand Down
18 changes: 17 additions & 1 deletion metaflow/plugins/kubernetes/kubernetes_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
KUBERNETES_NAMESPACE,
KUBERNETES_NODE_SELECTOR,
KUBERNETES_PERSISTENT_VOLUME_CLAIMS,
KUBERNETES_EPHEMERAL_VOLUME_CLAIMS,
KUBERNETES_PORT,
KUBERNETES_SERVICE_ACCOUNT,
KUBERNETES_SHARED_MEMORY,
Expand Down Expand Up @@ -102,6 +103,9 @@ class KubernetesDecorator(StepDecorator):
persistent_volume_claims : Dict[str, str], optional, default None
A map (dictionary) of persistent volumes to be mounted to the pod for this step. The map is from persistent
volumes to the path to which the volume is to be mounted, e.g., `{'pvc-name': '/path/to/mount/on'}`.
ephemeral_volume_claims: Dict[str, Any], optional, default None
A map (dictionary) of ephemeral volumes to be mounted to the pod for this step. The map is a name
to a dictionary containing the key 'path' (required), 'metadata' (optional), and 'spec' (optional).
shared_memory: int, optional
Shared memory size (in MiB) required for this step
port: int, optional
Expand Down Expand Up @@ -136,6 +140,7 @@ class KubernetesDecorator(StepDecorator):
"tmpfs_size": None,
"tmpfs_path": "/metaflow_temp",
"persistent_volume_claims": None, # e.g., {"pvc-name": "/mnt/vol", "another-pvc": "/mnt/vol2"}
"ephemeral_volume_claims": None, # e.g., {"ephemeral-name": {"path": "/mnt/vol", "spec": {"storageClassName": "my_storage_class"}}}
"shared_memory": None,
"port": None,
"compute_pool": None,
Expand Down Expand Up @@ -171,6 +176,13 @@ def init(self):
self.attributes["persistent_volume_claims"] = json.loads(
KUBERNETES_PERSISTENT_VOLUME_CLAIMS
)
if (
not self.attributes["ephemeral_volume_claims"]
and KUBERNETES_EPHEMERAL_VOLUME_CLAIMS
):
self.attributes["ephemeral_volume_claims"] = json.loads(
KUBERNETES_EPHEMERAL_VOLUME_CLAIMS
)
if not self.attributes["image_pull_policy"] and KUBERNETES_IMAGE_PULL_POLICY:
self.attributes["image_pull_policy"] = KUBERNETES_IMAGE_PULL_POLICY

Expand Down Expand Up @@ -426,7 +438,11 @@ def runtime_step_cli(
"=".join([key, str(val)]) if val else key
for key, val in v.items()
]
elif k in ["tolerations", "persistent_volume_claims"]:
elif k in [
"tolerations",
"persistent_volume_claims",
"ephemeral_volume_claims",
]:
cli_args.command_options[k] = json.dumps(v)
else:
cli_args.command_options[k] = v
Expand Down
34 changes: 33 additions & 1 deletion metaflow/plugins/kubernetes/kubernetes_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
KubernetesJobSet,
) # We need this import for Kubernetes Client.

from .kube_utils import qos_requests_and_limits
from .kube_utils import qos_requests_and_limits, VOLUME_CLAIM_TEMPLATE_DEFAULTS


class KubernetesJobException(MetaflowException):
Expand Down Expand Up @@ -205,6 +205,18 @@ def create_job_spec(self):
]
if self._kwargs["persistent_volume_claims"] is not None
else []
)
+ (
[
client.V1VolumeMount(
mount_path=vals["path"], name=name
)
for name, vals in self._kwargs[
"ephemeral_volume_claims"
].items()
]
if self._kwargs["ephemeral_volume_claims"] is not None
else []
),
)
],
Expand Down Expand Up @@ -266,6 +278,26 @@ def create_job_spec(self):
]
if self._kwargs["persistent_volume_claims"] is not None
else []
)
+ (
[
client.V1Volume(
name=name,
ephemeral=client.V1EphemeralVolumeSource(
volume_claim_template=client.V1PersistentVolumeClaimTemplate(
spec={
**VOLUME_CLAIM_TEMPLATE_DEFAULTS,
trhodeos marked this conversation as resolved.
Show resolved Hide resolved
**vals.get("spec", {}),
},
)
),
)
for name, vals in self._kwargs[
"ephemeral_volume_claims"
].items()
]
if self._kwargs["ephemeral_volume_claims"] is not None
else []
),
),
),
Expand Down
36 changes: 35 additions & 1 deletion metaflow/plugins/kubernetes/kubernetes_jobsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from metaflow.tracing import inject_tracing_vars
from metaflow.metaflow_config import KUBERNETES_SECRETS

from .kube_utils import qos_requests_and_limits
from .kube_utils import qos_requests_and_limits, VOLUME_CLAIM_TEMPLATE_DEFAULTS


class KubernetesJobsetException(MetaflowException):
Expand Down Expand Up @@ -707,6 +707,19 @@ def dump(self):
if self._kwargs["persistent_volume_claims"]
is not None
else []
)
+ (
[
client.V1VolumeMount(
mount_path=vals["path"], name=name
)
for name, vals in self._kwargs[
"ephemeral_volume_claims"
].items()
]
if self._kwargs["ephemeral_volume_claims"]
is not None
else []
),
)
],
Expand Down Expand Up @@ -772,6 +785,27 @@ def dump(self):
if self._kwargs["persistent_volume_claims"]
is not None
else []
)
+ (
[
client.V1Volume(
name=name,
ephemeral=client.V1EphemeralVolumeSource(
volume_claim_template=client.V1PersistentVolumeClaimTemplate(
spec={
**VOLUME_CLAIM_TEMPLATE_DEFAULTS,
trhodeos marked this conversation as resolved.
Show resolved Hide resolved
**vals.get("spec", {}),
},
)
),
)
for name, vals in self._kwargs[
"ephemeral_volume_claims"
].items()
]
if self._kwargs["ephemeral_volume_claims"]
is not None
else []
),
),
),
Expand Down
Loading