From 91e6a321c08b75990b4ab45c13eabdc6b11bf0b5 Mon Sep 17 00:00:00 2001 From: Devin Burke <46036921+mrburke00@users.noreply.github.com> Date: Thu, 14 Nov 2024 14:18:46 -0700 Subject: [PATCH] Update __init__.py --- .../__init__.py | 34 +++++++++++++++++-- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/snakemake_executor_plugin_kubernetes/__init__.py b/snakemake_executor_plugin_kubernetes/__init__.py index 5e148e9..ddbec0c 100644 --- a/snakemake_executor_plugin_kubernetes/__init__.py +++ b/snakemake_executor_plugin_kubernetes/__init__.py @@ -185,6 +185,7 @@ def run_job(self, job: JobExecutorInterface): # Node selector node_selector = {} if "machine_type" in job.resources.keys(): + # Kubernetes labels a node by its instance type using this node_label. node_selector["node.kubernetes.io/instance-type"] = job.resources["machine_type"] self.logger.debug(f"Set node selector for machine type: {node_selector}") @@ -315,6 +316,20 @@ def run_job(self, job: JobExecutorInterface): async def check_active_jobs( self, active_jobs: List[SubmittedJobInfo] ) -> Generator[SubmittedJobInfo, None, None]: + # Check the status of active jobs. + + # You have to iterate over the given list active_jobs. + # For jobs that have finished successfully, you have to call + # self.report_job_success(job). + # For jobs that have errored, you have to call + # self.report_job_error(job). + # Jobs that are still running have to be yielded. + # + # For queries to the remote middleware, please use + # self.status_rate_limiter like this: + # + # async with self.status_rate_limiter: + # # query remote middleware here self.logger.debug(f"Checking status of {len(active_jobs)} jobs") for j in active_jobs: async with self.status_rate_limiter: @@ -377,6 +392,8 @@ async def check_active_jobs( yield j def cancel_jobs(self, active_jobs: List[SubmittedJobInfo]): + # Cancel all active jobs. + # This method is called when Snakemake is interrupted. for j in active_jobs: self._kubernetes_retry( lambda: self.safe_delete_pod(j.external_jobid, ignore_not_found=True) @@ -423,7 +440,7 @@ def unregister_secret(self): body=kubernetes.client.V1DeleteOptions(), ) ) - + # In rare cases, deleting a pod may raise 404 NotFound error. def safe_delete_pod(self, jobid, ignore_not_found=True): import kubernetes.client @@ -432,16 +449,27 @@ def safe_delete_pod(self, jobid, ignore_not_found=True): self.kubeapi.delete_namespaced_pod(jobid, self.namespace, body=body) except kubernetes.client.rest.ApiException as e: if e.status == 404 and ignore_not_found: + # Can't find the pod. Maybe it's already been + # destroyed. Proceed with a warning message. self.logger.warning( "[WARNING] 404 not found when trying to delete the pod: {jobid}\n" "[WARNING] Ignore this error\n".format(jobid=jobid) ) else: raise e - + # Sometimes, certain k8s requests throw kubernetes.client.rest.ApiException + # Solving this issue requires reauthentication, as _kubernetes_retry shows + # However, reauthentication itself, under rare conditions, may also throw + # errors such as: + # kubernetes.client.exceptions.ApiException: (409), Reason: Conflict + # + # This error doesn't mean anything wrong with the k8s cluster, and users can safely + # ignore it. def _reauthenticate_and_retry(self, func=None): import kubernetes - + # Unauthorized. + # Reload config in order to ensure token is + # refreshed. Then try again. self.logger.info("Trying to reauthenticate") kubernetes.config.load_kube_config() subprocess.run(["kubectl", "get", "nodes"])