From 8c8172aceedf3bd7ac5ee2173b8bc0e4abfeda5c Mon Sep 17 00:00:00 2001 From: Vladyslav Moisieienkov Date: Thu, 16 Dec 2021 18:15:41 +0100 Subject: [PATCH] executor, add job status polling closes #33 --- CHANGES.rst | 5 +++ reana_workflow_engine_snakemake/config.py | 34 +++++++++++++++ reana_workflow_engine_snakemake/executor.py | 46 +++++++++++++++++---- 3 files changed, 77 insertions(+), 8 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index 021baee..ab33697 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,6 +1,11 @@ Changes ======= +Version 0.8.1 (UNRELEASED) +--------------------------- + +- Adds jobs status polling. + Version 0.8.0 (2021-11-22) --------------------------- diff --git a/reana_workflow_engine_snakemake/config.py b/reana_workflow_engine_snakemake/config.py index 4b99137..d2a6924 100644 --- a/reana_workflow_engine_snakemake/config.py +++ b/reana_workflow_engine_snakemake/config.py @@ -9,6 +9,7 @@ """REANA Workflow Engine Snakemake configuration.""" import os +from enum import Enum MOUNT_CVMFS = os.getenv("REANA_MOUNT_CVMFS", "false") @@ -20,3 +21,36 @@ SNAKEMAKE_MAX_PARALLEL_JOBS = 100 """Snakemake maximum number of jobs that can run in parallel.""" + + +# defined in reana-db component, in reana_db/models.py file as JobStatus +class JobStatus(str, Enum): + """Enumeration of job statuses. + + Example: + JobStatus.started == "started" # True + """ + + # FIXME: this state is not defined in reana-db but returned by r-job-controller + started = "started" + + created = "created" + running = "running" + finished = "finished" + failed = "failed" + stopped = "stopped" + queued = "queued" + + +# defined in reana-db component, in reana_db/models.py file as RunStatus +class RunStatus(Enum): + """Enumeration of possible run statuses of a workflow.""" + + created = 0 + running = 1 + finished = 2 + failed = 3 + deleted = 4 + stopped = 5 + queued = 6 + pending = 7 diff --git a/reana_workflow_engine_snakemake/executor.py b/reana_workflow_engine_snakemake/executor.py index 657caff..b3c9362 100644 --- a/reana_workflow_engine_snakemake/executor.py +++ b/reana_workflow_engine_snakemake/executor.py @@ -11,6 +11,8 @@ import os import subprocess import logging +import time +import threading from collections import namedtuple from typing import Callable @@ -26,6 +28,8 @@ LOGGING_MODULE, MOUNT_CVMFS, SNAKEMAKE_MAX_PARALLEL_JOBS, + JobStatus, + RunStatus, ) from reana_workflow_engine_snakemake.utils import ( publish_job_submission, @@ -45,6 +49,11 @@ class REANAClusterExecutor(GenericClusterExecutor): """REANA Cluster Snakemake executor implementation.""" + def __init__(self, *args, **kwargs): + """Initialize REANA Cluster Snakemake executor. Start polling jobs.""" + super().__init__(*args, **kwargs) + self._start_job_polling() + def run( self, job: Job, @@ -145,7 +154,8 @@ def run( ) ) - def _get_container_image(self, job: Job) -> str: + @staticmethod + def _get_container_image(job: Job) -> str: if job.container_img_url: container_image = job.container_img_url.replace("docker://", "") log.info(f"Environment: {container_image}") @@ -154,7 +164,7 @@ def _get_container_image(self, job: Job) -> str: log.info(f"No environment specified, falling back to: {container_image}") return container_image - def _handle_job_status(self, job, status): + def _handle_workflow_status(self, job, status: RunStatus): workflow_uuid = os.getenv("workflow_uuid", "default") job_id = job.reana_job_id log.info(f"{status} job: {job_id}") @@ -163,11 +173,8 @@ def _handle_job_status(self, job, status): **{status: {"total": 1, "job_ids": [job_id]}} ) } - status_running = 1 - status_failed = 3 - status_mapping = {"finished": status_running, "failed": status_failed} self.publisher.publish_workflow_status( - workflow_uuid, status_mapping[status], message=message + workflow_uuid, status.value, message=message ) def handle_job_success(self, job): @@ -177,13 +184,36 @@ def handle_job_success(self, job): job, upload_remote=False, handle_log=False, handle_touch=True ) - self._handle_job_status(job, "finished") + self._handle_workflow_status(job, RunStatus.running) def handle_job_error(self, job): """Override job error method to publish job status.""" super().handle_job_error(job) - self._handle_job_status(job, "failed") + self._handle_workflow_status(job, RunStatus.failed) + + def _get_job_status_from_controller(self, job_id: str) -> str: + response = self.rjc_api_client.check_status(job_id) + return response["status"] + + def _poll_jobs(self): + while True: + with self.lock: + active_jobs = self.active_jobs + + log.debug(f"Active jobs: {self.active_jobs}") + for active_job in active_jobs: + job_id = active_job.job.reana_job_id + status = self._get_job_status_from_controller(job_id) + if status == JobStatus.finished: + self.handle_job_success(active_job.job) + elif status == JobStatus.failed: + self.handle_job_error(active_job.job) + + time.sleep(5) + + def _start_job_polling(self): + threading.Thread(target=self._poll_jobs, daemon=True).start() def submit_job(rjc_api_client, publisher, job_request_body):