diff --git a/AUTHORS.rst b/AUTHORS.rst index 9407369..9ee92eb 100644 --- a/AUTHORS.rst +++ b/AUTHORS.rst @@ -7,3 +7,4 @@ The list of contributors in alphabetical order: - `Marco Vidal `_ - `Sinclert Perez `_ - `Tibor Simko `_ +- `Vladyslav Moisieienkov `_ diff --git a/CHANGES.rst b/CHANGES.rst index 021baee..c98ae2b 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,6 +1,11 @@ Changes ======= +Version 0.8.1 (UNRELEASED) +--------------------------- + +- Adds polling job-controller to determine job statuses instead of checking files. + Version 0.8.0 (2021-11-22) --------------------------- diff --git a/reana_workflow_engine_snakemake/config.py b/reana_workflow_engine_snakemake/config.py index 4b99137..bb31af6 100644 --- a/reana_workflow_engine_snakemake/config.py +++ b/reana_workflow_engine_snakemake/config.py @@ -9,6 +9,7 @@ """REANA Workflow Engine Snakemake configuration.""" import os +from enum import Enum MOUNT_CVMFS = os.getenv("REANA_MOUNT_CVMFS", "false") @@ -20,3 +21,39 @@ SNAKEMAKE_MAX_PARALLEL_JOBS = 100 """Snakemake maximum number of jobs that can run in parallel.""" + +POLL_JOBS_STATUS_SLEEP_IN_SECONDS = 10 +"""Time to sleep between polling for job status.""" + + +# defined in reana-db component, in reana_db/models.py file as JobStatus +class JobStatus(Enum): + """Enumeration of job statuses. + + Example: + JobStatus.started.name == "started" # True + """ + + # FIXME: this state is not defined in reana-db but returned by r-job-controller + started = 6 + + created = 0 + running = 1 + finished = 2 + failed = 3 + stopped = 4 + queued = 5 + + +# defined in reana-db component, in reana_db/models.py file as RunStatus +class RunStatus(Enum): + """Enumeration of possible run statuses of a workflow.""" + + created = 0 + running = 1 + finished = 2 + failed = 3 + deleted = 4 + stopped = 5 + queued = 6 + pending = 7 diff --git a/reana_workflow_engine_snakemake/executor.py b/reana_workflow_engine_snakemake/executor.py index 657caff..38d1fcf 100644 --- a/reana_workflow_engine_snakemake/executor.py +++ b/reana_workflow_engine_snakemake/executor.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # # This file is part of REANA. -# Copyright (C) 2021 CERN. +# Copyright (C) 2021, 2022 CERN. # # REANA is free software; you can redistribute it and/or modify it # under the terms of the MIT License; see LICENSE file for more details. @@ -9,11 +9,12 @@ """REANA-Workflow-Engine-Snakemake executor.""" import os -import subprocess import logging +import time from collections import namedtuple from typing import Callable +from bravado.exception import HTTPNotFound from reana_commons.config import REANA_DEFAULT_SNAKEMAKE_ENV_IMAGE from reana_commons.utils import build_progress_message from snakemake import snakemake @@ -26,20 +27,19 @@ LOGGING_MODULE, MOUNT_CVMFS, SNAKEMAKE_MAX_PARALLEL_JOBS, + POLL_JOBS_STATUS_SLEEP_IN_SECONDS, + JobStatus, + RunStatus, ) from reana_workflow_engine_snakemake.utils import ( publish_job_submission, publish_workflow_start, ) - log = logging.getLogger(LOGGING_MODULE) -REANAClusterJob = namedtuple( - "REANAClusterJob", - "job jobid callback error_callback jobscript jobfinished jobfailed", -) +REANAClusterJob = namedtuple("REANAClusterJob", "job callback error_callback") class REANAClusterExecutor(GenericClusterExecutor): @@ -54,16 +54,6 @@ def run( ): """Override GenericClusterExecutor run method.""" super()._run(job) - jobid = job.jobid - - # Files needed for Snakemake (`GenericClusterExecutor._wait_for_jobs`) - # to check if a job finished successfully. - jobscript = self.get_jobscript(job) - jobfinished = os.path.join(self.tmpdir, "{}.jobfinished".format(jobid)) - jobfailed = os.path.join(self.tmpdir, "{}.jobfailed".format(jobid)) - self.write_jobscript( - job, jobscript, jobfinished=jobfinished, jobfailed=jobfailed - ) workflow_workspace = os.getenv("workflow_workspace", "default") workflow_uuid = os.getenv("workflow_uuid", "default") @@ -79,7 +69,7 @@ def run( job_request_body = { "workflow_uuid": workflow_uuid, "image": container_image, - "cmd": f"cd {workflow_workspace} && {job.shellcmd} && touch {jobfinished} || (touch {jobfailed}; exit 1)", + "cmd": f"cd {workflow_workspace} && {job.shellcmd}", "prettified_cmd": job.shellcmd, "workflow_workspace": workflow_workspace, "job_name": job.name, @@ -112,40 +102,12 @@ def run( log.error(f"Error submitting job {job.name}: {excep}") error_callback(job) return - # We don't need to call `submit_callback(job)` manually since - # it would immediately check if the output files are present - # and fail otherwise (3 sec timeout). - - if job.is_norun: - job_id = "all" - # Manually create the jobfinished for the root rule (`all`) - # to mark it as successful. - try: - subprocess.check_output( - f"touch {jobfinished}", shell=True, - ) - except subprocess.CalledProcessError as ex: - log.error( - "Error creating `all` jobfinished file (exit code {}):\n{}".format( - ex.returncode, ex.output.decode() - ) - ) - error_callback(job) - return + with self.lock: - self.active_jobs.append( - REANAClusterJob( - job, - job_id, - callback, - error_callback, - jobscript, - jobfinished, - jobfailed, - ) - ) + self.active_jobs.append(REANAClusterJob(job, callback, error_callback)) - def _get_container_image(self, job: Job) -> str: + @staticmethod + def _get_container_image(job: Job) -> str: if job.container_img_url: container_image = job.container_img_url.replace("docker://", "") log.info(f"Environment: {container_image}") @@ -154,36 +116,97 @@ def _get_container_image(self, job: Job) -> str: log.info(f"No environment specified, falling back to: {container_image}") return container_image - def _handle_job_status(self, job, status): + def _handle_job_status( + self, job: Job, job_status: JobStatus, workflow_status: RunStatus + ) -> None: workflow_uuid = os.getenv("workflow_uuid", "default") job_id = job.reana_job_id - log.info(f"{status} job: {job_id}") + log.info(f"{job_status} job: {job_id}") message = { "progress": build_progress_message( - **{status: {"total": 1, "job_ids": [job_id]}} + **{job_status.name: {"total": 1, "job_ids": [job_id]}} ) } - status_running = 1 - status_failed = 3 - status_mapping = {"finished": status_running, "failed": status_failed} self.publisher.publish_workflow_status( - workflow_uuid, status_mapping[status], message=message + workflow_uuid, workflow_status.value, message=message ) - def handle_job_success(self, job): + def handle_job_success(self, job: Job) -> None: """Override job success method to publish job status.""" # override handle_touch = True, to enable `touch()` in Snakefiles + # `touch()` is responsible for checking output files existence super(ClusterExecutor, self).handle_job_success( job, upload_remote=False, handle_log=False, handle_touch=True ) - self._handle_job_status(job, "finished") + self._handle_job_status( + job, job_status=JobStatus.finished, workflow_status=RunStatus.running + ) - def handle_job_error(self, job): + def handle_job_error(self, job: Job) -> None: """Override job error method to publish job status.""" super().handle_job_error(job) - self._handle_job_status(job, "failed") + self._handle_job_status( + job, job_status=JobStatus.failed, workflow_status=RunStatus.failed + ) + + def _get_job_status_from_controller(self, job_id: str) -> str: + """Get job status from controller. + + If error occurs, return `failed` status. + """ + try: + response = self.rjc_api_client.check_status(job_id) + except HTTPNotFound: + log.error( + f"Job {job_id} was not found in job-controller. Return job failed status." + ) + return JobStatus.failed.name + except Exception as exception: + log.error( + f"Error getting status of job with id {job_id}. Return job failed status. Details: {exception}" + ) + return JobStatus.failed.name + + try: + return response.status + except AttributeError: + log.error( + f"job-controller response for job {job_id} does not contain 'status' field. Return job failed status." + f"Response: {response}" + ) + return JobStatus.failed.name + + def _wait_for_jobs(self): + """Override _wait_for_jobs method to poll job-controller for job statuses. + + Original GenericClusterExecutor._wait_for_jobs method checks success/failure via .jobfinished or .jobfailed files. + """ + while True: + with self.lock: + if not self.wait: + return + active_jobs = self.active_jobs + self.active_jobs = [] + still_running = [] + + for active_job in active_jobs: + job_id = active_job.job.reana_job_id + + status = self._get_job_status_from_controller(job_id) + + if status == JobStatus.finished.name or active_job.job.is_norun: + active_job.callback(active_job.job) + elif status == JobStatus.failed.name: + active_job.error_callback(active_job.job) + else: + still_running.append(active_job) + + with self.lock: + self.active_jobs = still_running + + time.sleep(POLL_JOBS_STATUS_SLEEP_IN_SECONDS) def submit_job(rjc_api_client, publisher, job_request_body): @@ -191,7 +214,7 @@ def submit_job(rjc_api_client, publisher, job_request_body): response = rjc_api_client.submit(**job_request_body) job_id = str(response["job_id"]) - log.info("submitted job:{0}".format(job_id)) + log.info(f"submitted job: {job_id}") publish_job_submission( workflow_uuid=job_request_body["workflow_uuid"], publisher=publisher, @@ -199,10 +222,6 @@ def submit_job(rjc_api_client, publisher, job_request_body): ) return job_id - # FIXME: Call `job_status = poll_job_status(rjc_api_client, job_id)` instead of - # checking job success/failure via `jobfinished`/`jobfailed` files in .snakemake? - # In that case we would probably need to implement our own `_wait_for_jobs` method. - def run_jobs( rjc_api_client,