diff --git a/stimela/backends/kube/__init__.py b/stimela/backends/kube/__init__.py index 1f460d56..655f0c5c 100644 --- a/stimela/backends/kube/__init__.py +++ b/stimela/backends/kube/__init__.py @@ -186,6 +186,7 @@ class DaskCluster(object): name: Optional[str] = None num_workers: int = 0 threads_per_worker: int = 1 + memory_limit: Optional[str] = None worker_pod: KubePodSpec = KubePodSpec() scheduler_pod: KubePodSpec = KubePodSpec() forward_dashboard_port: int = 8787 # set to non-0 to forward the http dashboard to this local port diff --git a/stimela/backends/kube/daskjob.py b/stimela/backends/kube/daskjob.py index 80e4ed11..48418b8e 100644 --- a/stimela/backends/kube/daskjob.py +++ b/stimela/backends/kube/daskjob.py @@ -27,6 +27,11 @@ def create_parser(): default="compute-runner", help="The kubernetes service account which will run the job", ) + p.add_argument( + "--pull-policy", + default="Always", + help="imagePullPolicy setting for pods", + ) p.add_argument( "-f", "--mount-file", @@ -131,7 +136,7 @@ def daskjob_template(args): } ], "image": args.image, - "imagePullPolicy": "IfNotPresent", + "imagePullPolicy": args.pull_policy, "livenessProbe": { "httpGet": { "path": "/health", @@ -184,7 +189,7 @@ def daskjob_template(args): {"name": "WORKER_ENV", "value": "hello-world"} ], "image": args.image, - "imagePullPolicy": "IfNotPresent", + "imagePullPolicy": args.pull_policy, "name": "worker", } ], @@ -198,7 +203,7 @@ def daskjob_template(args): "containers": [ { "image": args.image, - "imagePullPolicy": "IfNotPresent", + "imagePullPolicy": args.pull_policy, "name": "job", } ], diff --git a/stimela/backends/kube/run_kube.py b/stimela/backends/kube/run_kube.py index c32c12ff..8b0b0fc9 100644 --- a/stimela/backends/kube/run_kube.py +++ b/stimela/backends/kube/run_kube.py @@ -137,7 +137,9 @@ def dprint(level, *args): labels=pod_labels, namespace=namespace, image=image_name, - memory_limit=kube.dask_cluster.worker_pod.memory and kube.dask_cluster.worker_pod.memory.limit, + pull_policy='Always' if kube.always_pull_images else 'IfNotPresent', + memory_limit=kube.dask_cluster.memory_limit if kube.dask_cluster.memory_limit is not None + else kube.dask_cluster.worker_pod.memory and kube.dask_cluster.worker_pod.memory.limit, nworkers=kube.dask_cluster.num_workers, threads_per_worker=kube.dask_cluster.threads_per_worker, # cmdline=["/bin/sh", "-c", "while true;do date;sleep 5; done"],