Skip to content

Commit

Permalink
executor, add job status polling
Browse files Browse the repository at this point in the history
closes #33
  • Loading branch information
Vladyslav Moisieienkov committed Dec 20, 2021
1 parent 7269f5a commit bf5c9b6
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 12 deletions.
5 changes: 5 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
Changes
=======

Version 0.8.1 (UNRELEASED)
---------------------------

- Adds jobs status polling.

Version 0.8.0 (2021-11-22)
---------------------------

Expand Down
37 changes: 37 additions & 0 deletions reana_workflow_engine_snakemake/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"""REANA Workflow Engine Snakemake configuration."""

import os
from enum import Enum

MOUNT_CVMFS = os.getenv("REANA_MOUNT_CVMFS", "false")

Expand All @@ -20,3 +21,39 @@

SNAKEMAKE_MAX_PARALLEL_JOBS = 100
"""Snakemake maximum number of jobs that can run in parallel."""

POLL_JOBS_STATUS_SLEEP_IN_SECONDS = 5
"""Time to sleep between polling for job status."""


# defined in reana-db component, in reana_db/models.py file as JobStatus
class JobStatus(str, Enum):
"""Enumeration of job statuses.
Example:
JobStatus.started == "started" # True
"""

# FIXME: this state is not defined in reana-db but returned by r-job-controller
started = "started"

created = "created"
running = "running"
finished = "finished"
failed = "failed"
stopped = "stopped"
queued = "queued"


# defined in reana-db component, in reana_db/models.py file as RunStatus
class RunStatus(Enum):
"""Enumeration of possible run statuses of a workflow."""

created = 0
running = 1
finished = 2
failed = 3
deleted = 4
stopped = 5
queued = 6
pending = 7
61 changes: 49 additions & 12 deletions reana_workflow_engine_snakemake/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import os
import subprocess
import logging
import time
import threading
from collections import namedtuple
from typing import Callable

Expand All @@ -26,6 +28,9 @@
LOGGING_MODULE,
MOUNT_CVMFS,
SNAKEMAKE_MAX_PARALLEL_JOBS,
POLL_JOBS_STATUS_SLEEP_IN_SECONDS,
JobStatus,
RunStatus,
)
from reana_workflow_engine_snakemake.utils import (
publish_job_submission,
Expand All @@ -45,6 +50,11 @@
class REANAClusterExecutor(GenericClusterExecutor):
"""REANA Cluster Snakemake executor implementation."""

def __init__(self, *args, **kwargs):
"""Initialize REANA Cluster Snakemake executor. Start polling jobs."""
super().__init__(*args, **kwargs)
self._start_job_polling()

def run(
self,
job: Job,
Expand Down Expand Up @@ -145,7 +155,8 @@ def run(
)
)

def _get_container_image(self, job: Job) -> str:
@staticmethod
def _get_container_image(job: Job) -> str:
if job.container_img_url:
container_image = job.container_img_url.replace("docker://", "")
log.info(f"Environment: {container_image}")
Expand All @@ -154,36 +165,62 @@ def _get_container_image(self, job: Job) -> str:
log.info(f"No environment specified, falling back to: {container_image}")
return container_image

def _handle_job_status(self, job, status):
def _handle_workflow_status(
self, job, job_status: JobStatus, workflow_status: RunStatus
) -> None:
workflow_uuid = os.getenv("workflow_uuid", "default")
job_id = job.reana_job_id
log.info(f"{status} job: {job_id}")
log.info(f"{job_status} job: {job_id}")
message = {
"progress": build_progress_message(
**{status: {"total": 1, "job_ids": [job_id]}}
**{job_status.name: {"total": 1, "job_ids": [job_id]}}
)
}
status_running = 1
status_failed = 3
status_mapping = {"finished": status_running, "failed": status_failed}
self.publisher.publish_workflow_status(
workflow_uuid, status_mapping[status], message=message
workflow_uuid, workflow_status.value, message=message
)

def handle_job_success(self, job):
def handle_job_success(self, job) -> None:
"""Override job success method to publish job status."""
# override handle_touch = True, to enable `touch()` in Snakefiles
super(ClusterExecutor, self).handle_job_success(
job, upload_remote=False, handle_log=False, handle_touch=True
)

self._handle_job_status(job, "finished")
self._handle_workflow_status(
job, job_status=JobStatus.finished, workflow_status=RunStatus.running
)

def handle_job_error(self, job):
def handle_job_error(self, job) -> None:
"""Override job error method to publish job status."""
super().handle_job_error(job)

self._handle_job_status(job, "failed")
self._handle_workflow_status(
job, job_status=JobStatus.failed, workflow_status=RunStatus.failed
)

def _get_job_status_from_controller(self, job_id: str) -> str:
response = self.rjc_api_client.check_status(job_id)
return response["status"]

def _poll_jobs(self):
while True:
with self.lock:
active_jobs = self.active_jobs

log.debug(f"Active jobs: {self.active_jobs}")
for active_job in active_jobs:
job_id = active_job.job.reana_job_id
status = self._get_job_status_from_controller(job_id)
if status == JobStatus.finished:
self.handle_job_success(active_job.job)
elif status == JobStatus.failed:
self.handle_job_error(active_job.job)

time.sleep(POLL_JOBS_STATUS_SLEEP_IN_SECONDS)

def _start_job_polling(self) -> None:
threading.Thread(target=self._poll_jobs, daemon=True).start()


def submit_job(rjc_api_client, publisher, job_request_body):
Expand Down

0 comments on commit bf5c9b6

Please sign in to comment.