Skip to content

Commit

Permalink
Update __init__.py
Browse files Browse the repository at this point in the history
  • Loading branch information
mrburke00 authored Nov 14, 2024
1 parent ced0f8b commit 91e6a32
Showing 1 changed file with 31 additions and 3 deletions.
34 changes: 31 additions & 3 deletions snakemake_executor_plugin_kubernetes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ def run_job(self, job: JobExecutorInterface):
# Node selector
node_selector = {}
if "machine_type" in job.resources.keys():
# Kubernetes labels a node by its instance type using this node_label.
node_selector["node.kubernetes.io/instance-type"] = job.resources["machine_type"]
self.logger.debug(f"Set node selector for machine type: {node_selector}")

Expand Down Expand Up @@ -315,6 +316,20 @@ def run_job(self, job: JobExecutorInterface):
async def check_active_jobs(
self, active_jobs: List[SubmittedJobInfo]
) -> Generator[SubmittedJobInfo, None, None]:
# Check the status of active jobs.

# You have to iterate over the given list active_jobs.
# For jobs that have finished successfully, you have to call
# self.report_job_success(job).
# For jobs that have errored, you have to call
# self.report_job_error(job).
# Jobs that are still running have to be yielded.
#
# For queries to the remote middleware, please use
# self.status_rate_limiter like this:
#
# async with self.status_rate_limiter:
# # query remote middleware here
self.logger.debug(f"Checking status of {len(active_jobs)} jobs")
for j in active_jobs:
async with self.status_rate_limiter:
Expand Down Expand Up @@ -377,6 +392,8 @@ async def check_active_jobs(
yield j

def cancel_jobs(self, active_jobs: List[SubmittedJobInfo]):
# Cancel all active jobs.
# This method is called when Snakemake is interrupted.
for j in active_jobs:
self._kubernetes_retry(
lambda: self.safe_delete_pod(j.external_jobid, ignore_not_found=True)
Expand Down Expand Up @@ -423,7 +440,7 @@ def unregister_secret(self):
body=kubernetes.client.V1DeleteOptions(),
)
)

# In rare cases, deleting a pod may raise 404 NotFound error.
def safe_delete_pod(self, jobid, ignore_not_found=True):
import kubernetes.client

Expand All @@ -432,16 +449,27 @@ def safe_delete_pod(self, jobid, ignore_not_found=True):
self.kubeapi.delete_namespaced_pod(jobid, self.namespace, body=body)
except kubernetes.client.rest.ApiException as e:
if e.status == 404 and ignore_not_found:
# Can't find the pod. Maybe it's already been
# destroyed. Proceed with a warning message.
self.logger.warning(
"[WARNING] 404 not found when trying to delete the pod: {jobid}\n"
"[WARNING] Ignore this error\n".format(jobid=jobid)
)
else:
raise e

# Sometimes, certain k8s requests throw kubernetes.client.rest.ApiException
# Solving this issue requires reauthentication, as _kubernetes_retry shows
# However, reauthentication itself, under rare conditions, may also throw
# errors such as:
# kubernetes.client.exceptions.ApiException: (409), Reason: Conflict
#
# This error doesn't mean anything wrong with the k8s cluster, and users can safely
# ignore it.
def _reauthenticate_and_retry(self, func=None):
import kubernetes

# Unauthorized.
# Reload config in order to ensure token is
# refreshed. Then try again.
self.logger.info("Trying to reauthenticate")
kubernetes.config.load_kube_config()
subprocess.run(["kubectl", "get", "nodes"])
Expand Down

0 comments on commit 91e6a32

Please sign in to comment.