From d855d1291e4010833f761b74dd79ce10244aacad Mon Sep 17 00:00:00 2001 From: Marco Donadoni Date: Tue, 23 Jan 2024 11:25:21 +0100 Subject: [PATCH] feat(shutdown): stop all running jobs before stopping workflow (#423) Make sure that all the running jobs are stopped before stopping the run-batch pod, as otherwise they will continue running and they will not be cleaned up. Closes reanahub/reana-workflow-controller#546 --- reana_job_controller/job_monitor.py | 10 +- .../kubernetes_job_manager.py | 2 + reana_job_controller/rest.py | 140 ++++++++++++++++++ 3 files changed, 149 insertions(+), 3 deletions(-) diff --git a/reana_job_controller/job_monitor.py b/reana_job_controller/job_monitor.py index c465ce6a..6a3d3acb 100644 --- a/reana_job_controller/job_monitor.py +++ b/reana_job_controller/job_monitor.py @@ -113,7 +113,11 @@ def should_process_job(self, job_pod) -> bool: https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1Pod.md) """ remaining_jobs = self._get_remaining_jobs( - statuses_to_skip=[JobStatus.finished.name, JobStatus.failed.name] + statuses_to_skip=[ + JobStatus.finished.name, + JobStatus.failed.name, + JobStatus.stopped.name, + ] ) backend_job_id = self.get_backend_job_id(job_pod) is_job_in_remaining_jobs = backend_job_id in remaining_jobs @@ -291,7 +295,7 @@ def watch_jobs(self, job_db, app): :param job_db: Dictionary which contains all current jobs. """ ignore_hold_codes = [35, 16] - statuses_to_skip = ["finished", "failed"] + statuses_to_skip = ["finished", "failed", "stopped"] while True: try: logging.info("Starting a new stream request to watch Condor Jobs") @@ -422,7 +426,7 @@ def watch_jobs(self, job_db, app=None): banner_timeout=SLURM_SSH_BANNER_TIMEOUT, auth_timeout=SLURM_SSH_AUTH_TIMEOUT, ) - statuses_to_skip = ["finished", "failed"] + statuses_to_skip = ["finished", "failed", "stopped"] while True: logging.debug("Starting a new stream request to watch Jobs") try: diff --git a/reana_job_controller/kubernetes_job_manager.py b/reana_job_controller/kubernetes_job_manager.py index ececd56e..146a1d37 100644 --- a/reana_job_controller/kubernetes_job_manager.py +++ b/reana_job_controller/kubernetes_job_manager.py @@ -172,6 +172,8 @@ def execute(self): "initContainers": [], "volumes": [], "restartPolicy": "Never", + # No need to wait a long time for jobs to gracefully terminate + "terminationGracePeriodSeconds": 5, "enableServiceLinks": False, }, }, diff --git a/reana_job_controller/rest.py b/reana_job_controller/rest.py index 1fdbd42c..763cfad7 100644 --- a/reana_job_controller/rest.py +++ b/reana_job_controller/rest.py @@ -11,6 +11,7 @@ import copy import json import logging +import threading from flask import Blueprint, current_app, jsonify, request from sqlalchemy.exc import OperationalError @@ -19,6 +20,9 @@ REANAKubernetesWrongMemoryFormat, ) +from reana_db.models import JobStatus + + from reana_job_controller.errors import ComputingBackendSubmissionError from reana_job_controller.job_db import ( JOB_DB, @@ -28,9 +32,13 @@ retrieve_backend_job_id, retrieve_job, retrieve_job_logs, + store_job_logs, + update_job_status, ) from reana_job_controller.schemas import Job, JobRequest from reana_job_controller.utils import update_workflow_logs +from reana_job_controller import config + blueprint = Blueprint("jobs", __name__) @@ -38,6 +46,48 @@ job_schema = Job() +class JobCreationCondition: + """Mechanism used to synchronize the creation of jobs. + + This is used to make sure no thread is able to create new jobs during or after + the shutdown procedure, as otherwise some jobs might not be correctly stopped and + cleaned up. Jobs can still be created in parallel. This works similarly to a RW-lock. + """ + + def __init__(self): + """Initialise a new JobCreationCondition.""" + self.condition = threading.Condition() + self.creation_permitted = True + self.ongoing_creations = 0 + """Keep track of the number of ongoing job creations""" + + def start_creation(self) -> bool: + """Check if a new job can be created.""" + with self.condition: + if not self.creation_permitted: + return False + self.ongoing_creations += 1 + return True + + def stop_creation(self): + """Notify that the creation of the job has finished.""" + with self.condition: + self.ongoing_creations -= 1 + if self.ongoing_creations == 0: + self.condition.notify_all() + + def disable_creation(self): + """Do not permit to create any new jobs.""" + with self.condition: + # wait untill all job creations are finished + while self.ongoing_creations != 0: + self.condition.wait() + self.creation_permitted = False + + +job_creation_condition = JobCreationCondition() + + @blueprint.route("/job_cache", methods=["GET"]) def check_if_cached(): r"""Check if job is cached. @@ -227,6 +277,10 @@ def create_job(): # noqa return jsonify({"message": e.message}), 403 except REANAKubernetesWrongMemoryFormat as e: return jsonify({"message": e.message}), 400 + + if not job_creation_condition.start_creation(): + return jsonify({"message": "Cannot create new jobs, shutting down"}), 400 + try: backend_jod_id = job_obj.execute() except OperationalError as e: @@ -237,6 +291,9 @@ def create_job(): # noqa msg = f"Job submission failed. \n{e}" logging.error(msg, exc_info=True) return jsonify({"message": msg}), 500 + finally: + job_creation_condition.stop_creation() + if job_obj: job = copy.deepcopy(job_request) job["status"] = "started" @@ -419,6 +476,89 @@ def delete_job(job_id): # noqa return jsonify({"message": "The job {} doesn't exist".format(job_id)}), 404 +@blueprint.route("/shutdown", methods=["GET"]) +def shutdown(): + r"""Stop reana-job-controller. + + All running jobs will be stopped and no more jobs will be scheduled. + Kubernetes will call this endpoint before stopping the pod (PreStop hook). + --- + delete: + summary: Stop reana-job-controller + description: >- + All running jobs will be stopped and no more jobs will be scheduled. + Kubernetes will call this endpoint before stopping the pod (PreStop hook). + operationId: shutdown + consumes: + - application/json + produces: + - application/json + responses: + 200: + description: >- + Request successful. All jobs were stopped. + schema: + type: object + properties: + message: + type: string + examples: + application/json: + {"message": "All jobs stopped."} + 500: + description: >- + Request failed. Something went wrong while stopping the jobs. + schema: + type: object + properties: + message: + type: string + examples: + application/json: + {"message": "Could not stop jobs cdcf48b1-c2f3-4693-8230-b066e088444c"} + """ + logging.info("Starting shutdown") + job_creation_condition.disable_creation() + + # Now no more jobs can be scheduled, let's stop all of the others. + + jobs = retrieve_all_jobs() + failed_to_stop = [] + + # jobs is a list of dicts, where each dict has a single entry. + # the key of the dict is the job ID, the value contains the job details. + for job_dict in jobs: + for job_id, job in job_dict.items(): + if job["status"] in ("finished", "failed", "stopped"): + continue + + backend_job_id = retrieve_backend_job_id(job_id) + # FIXME: ideally we would not be accessing the database manually here + # to get the compute backend and the workspace, but this can wait for a general + # refactor of the "in-memory" database + compute_backend = JOB_DB[job_id]["compute_backend"] + workspace = JOB_DB[job_id]["obj"].workflow_workspace + job_manager_cls = config.COMPUTE_BACKENDS[compute_backend]() + logging.info(f"Stopping job {job_id} ({backend_job_id})") + try: + logs = job_manager_cls.get_logs(backend_job_id, workspace=workspace) + store_job_logs(job_id, logs) + job_manager_cls.stop(backend_job_id) + update_job_status(job_id, JobStatus.stopped.name) + # FIXME: ideally also here we would not access the database directly + JOB_DB[job_id]["deleted"] = True + except Exception: + logging.exception(f"Could not stop job {job_id} ({backend_job_id})") + failed_to_stop.append((job_id)) + + if failed_to_stop: + return ( + jsonify({"message": "Could not stop jobs " + ", ".join(failed_to_stop)}), + 500, + ) + return jsonify({"message": "All jobs stopped."}), 200 + + @blueprint.route("/apispec", methods=["GET"]) def get_openapi_spec(): """Get OpenAPI Spec."""