From aef617c7891a32a3b2311f3b452ed65415daa807 Mon Sep 17 00:00:00 2001 From: Vladyslav Moisieienkov Date: Thu, 16 Dec 2021 18:15:41 +0100 Subject: [PATCH] executor, poll job statuses instead of checking files - poll job-controller for job statuses instead of using files closes #33 --- CHANGES.rst | 5 ++ reana_workflow_engine_snakemake/config.py | 37 ++++++++ reana_workflow_engine_snakemake/executor.py | 96 +++++++++++++-------- 3 files changed, 104 insertions(+), 34 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index 021baee..ab33697 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,6 +1,11 @@ Changes ======= +Version 0.8.1 (UNRELEASED) +--------------------------- + +- Adds jobs status polling. + Version 0.8.0 (2021-11-22) --------------------------- diff --git a/reana_workflow_engine_snakemake/config.py b/reana_workflow_engine_snakemake/config.py index 4b99137..4efc8e8 100644 --- a/reana_workflow_engine_snakemake/config.py +++ b/reana_workflow_engine_snakemake/config.py @@ -9,6 +9,7 @@ """REANA Workflow Engine Snakemake configuration.""" import os +from enum import Enum MOUNT_CVMFS = os.getenv("REANA_MOUNT_CVMFS", "false") @@ -20,3 +21,39 @@ SNAKEMAKE_MAX_PARALLEL_JOBS = 100 """Snakemake maximum number of jobs that can run in parallel.""" + +POLL_JOBS_STATUS_SLEEP_IN_SECONDS = 10 +"""Time to sleep between polling for job status.""" + + +# defined in reana-db component, in reana_db/models.py file as JobStatus +class JobStatus(str, Enum): + """Enumeration of job statuses. + + Example: + JobStatus.started == "started" # True + """ + + # FIXME: this state is not defined in reana-db but returned by r-job-controller + started = "started" + + created = "created" + running = "running" + finished = "finished" + failed = "failed" + stopped = "stopped" + queued = "queued" + + +# defined in reana-db component, in reana_db/models.py file as RunStatus +class RunStatus(Enum): + """Enumeration of possible run statuses of a workflow.""" + + created = 0 + running = 1 + finished = 2 + failed = 3 + deleted = 4 + stopped = 5 + queued = 6 + pending = 7 diff --git a/reana_workflow_engine_snakemake/executor.py b/reana_workflow_engine_snakemake/executor.py index 657caff..8dc7325 100644 --- a/reana_workflow_engine_snakemake/executor.py +++ b/reana_workflow_engine_snakemake/executor.py @@ -11,6 +11,7 @@ import os import subprocess import logging +import time from collections import namedtuple from typing import Callable @@ -26,6 +27,9 @@ LOGGING_MODULE, MOUNT_CVMFS, SNAKEMAKE_MAX_PARALLEL_JOBS, + POLL_JOBS_STATUS_SLEEP_IN_SECONDS, + JobStatus, + RunStatus, ) from reana_workflow_engine_snakemake.utils import ( publish_job_submission, @@ -79,7 +83,7 @@ def run( job_request_body = { "workflow_uuid": workflow_uuid, "image": container_image, - "cmd": f"cd {workflow_workspace} && {job.shellcmd} && touch {jobfinished} || (touch {jobfailed}; exit 1)", + "cmd": f"cd {workflow_workspace} && {job.shellcmd}", "prettified_cmd": job.shellcmd, "workflow_workspace": workflow_workspace, "job_name": job.name, @@ -116,22 +120,6 @@ def run( # it would immediately check if the output files are present # and fail otherwise (3 sec timeout). - if job.is_norun: - job_id = "all" - # Manually create the jobfinished for the root rule (`all`) - # to mark it as successful. - try: - subprocess.check_output( - f"touch {jobfinished}", shell=True, - ) - except subprocess.CalledProcessError as ex: - log.error( - "Error creating `all` jobfinished file (exit code {}):\n{}".format( - ex.returncode, ex.output.decode() - ) - ) - error_callback(job) - return with self.lock: self.active_jobs.append( REANAClusterJob( @@ -145,7 +133,8 @@ def run( ) ) - def _get_container_image(self, job: Job) -> str: + @staticmethod + def _get_container_image(job: Job) -> str: if job.container_img_url: container_image = job.container_img_url.replace("docker://", "") log.info(f"Environment: {container_image}") @@ -154,36 +143,79 @@ def _get_container_image(self, job: Job) -> str: log.info(f"No environment specified, falling back to: {container_image}") return container_image - def _handle_job_status(self, job, status): + def _handle_job_status( + self, job, job_status: JobStatus, workflow_status: RunStatus + ) -> None: workflow_uuid = os.getenv("workflow_uuid", "default") job_id = job.reana_job_id - log.info(f"{status} job: {job_id}") + log.info(f"{job_status} job: {job_id}") message = { "progress": build_progress_message( - **{status: {"total": 1, "job_ids": [job_id]}} + **{job_status.name: {"total": 1, "job_ids": [job_id]}} ) } - status_running = 1 - status_failed = 3 - status_mapping = {"finished": status_running, "failed": status_failed} self.publisher.publish_workflow_status( - workflow_uuid, status_mapping[status], message=message + workflow_uuid, workflow_status.value, message=message ) - def handle_job_success(self, job): + def handle_job_success(self, job) -> None: """Override job success method to publish job status.""" # override handle_touch = True, to enable `touch()` in Snakefiles super(ClusterExecutor, self).handle_job_success( - job, upload_remote=False, handle_log=False, handle_touch=True + job, upload_remote=False, handle_log=False, handle_touch=False ) - self._handle_job_status(job, "finished") + self._handle_job_status( + job, job_status=JobStatus.finished, workflow_status=RunStatus.running + ) - def handle_job_error(self, job): + def handle_job_error(self, job) -> None: """Override job error method to publish job status.""" super().handle_job_error(job) - self._handle_job_status(job, "failed") + self._handle_job_status( + job, job_status=JobStatus.failed, workflow_status=RunStatus.failed + ) + + def _get_job_status_from_controller(self, job_id: str) -> str: + response = self.rjc_api_client.check_status(job_id) + return response["status"] + + def _wait_for_jobs(self): + """Override _wait_for_jobs method to poll job-controller for job statuses. + + Original _wait_for_jobs method was checking .jobfinished or .jobfailed files. + """ + while True: + with self.lock: + if not self.wait: + return + active_jobs = self.active_jobs + self.active_jobs = [] + still_running = [] + + for active_job in active_jobs: + job_id = active_job.job.reana_job_id + + try: + status = self._get_job_status_from_controller(job_id) + except Exception as error: + log.error( + f"Error getting status of job with id {job_id}. Details: {error}" + ) + status = JobStatus.failed + + if status == JobStatus.finished or active_job.job.is_norun: + active_job.callback(active_job.job) + elif status == JobStatus.failed: + active_job.error_callback(active_job.job) + else: + still_running.append(active_job) + + with self.lock: + self.active_jobs = still_running + + time.sleep(POLL_JOBS_STATUS_SLEEP_IN_SECONDS) def submit_job(rjc_api_client, publisher, job_request_body): @@ -199,10 +231,6 @@ def submit_job(rjc_api_client, publisher, job_request_body): ) return job_id - # FIXME: Call `job_status = poll_job_status(rjc_api_client, job_id)` instead of - # checking job success/failure via `jobfinished`/`jobfailed` files in .snakemake? - # In that case we would probably need to implement our own `_wait_for_jobs` method. - def run_jobs( rjc_api_client,