Skip to content

Commit

Permalink
executor, poll job statuses instead of checking files
Browse files Browse the repository at this point in the history
- poll job-controller for job statuses instead of using files

closes #33
  • Loading branch information
Vladyslav Moisieienkov committed Jan 5, 2022
1 parent 7269f5a commit aef617c
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 34 deletions.
5 changes: 5 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
Changes
=======

Version 0.8.1 (UNRELEASED)
---------------------------

- Adds jobs status polling.

Version 0.8.0 (2021-11-22)
---------------------------

Expand Down
37 changes: 37 additions & 0 deletions reana_workflow_engine_snakemake/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"""REANA Workflow Engine Snakemake configuration."""

import os
from enum import Enum

MOUNT_CVMFS = os.getenv("REANA_MOUNT_CVMFS", "false")

Expand All @@ -20,3 +21,39 @@

SNAKEMAKE_MAX_PARALLEL_JOBS = 100
"""Snakemake maximum number of jobs that can run in parallel."""

POLL_JOBS_STATUS_SLEEP_IN_SECONDS = 10
"""Time to sleep between polling for job status."""


# defined in reana-db component, in reana_db/models.py file as JobStatus
class JobStatus(str, Enum):
"""Enumeration of job statuses.
Example:
JobStatus.started == "started" # True
"""

# FIXME: this state is not defined in reana-db but returned by r-job-controller
started = "started"

created = "created"
running = "running"
finished = "finished"
failed = "failed"
stopped = "stopped"
queued = "queued"


# defined in reana-db component, in reana_db/models.py file as RunStatus
class RunStatus(Enum):
"""Enumeration of possible run statuses of a workflow."""

created = 0
running = 1
finished = 2
failed = 3
deleted = 4
stopped = 5
queued = 6
pending = 7
96 changes: 62 additions & 34 deletions reana_workflow_engine_snakemake/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import os
import subprocess
import logging
import time
from collections import namedtuple
from typing import Callable

Expand All @@ -26,6 +27,9 @@
LOGGING_MODULE,
MOUNT_CVMFS,
SNAKEMAKE_MAX_PARALLEL_JOBS,
POLL_JOBS_STATUS_SLEEP_IN_SECONDS,
JobStatus,
RunStatus,
)
from reana_workflow_engine_snakemake.utils import (
publish_job_submission,
Expand Down Expand Up @@ -79,7 +83,7 @@ def run(
job_request_body = {
"workflow_uuid": workflow_uuid,
"image": container_image,
"cmd": f"cd {workflow_workspace} && {job.shellcmd} && touch {jobfinished} || (touch {jobfailed}; exit 1)",
"cmd": f"cd {workflow_workspace} && {job.shellcmd}",
"prettified_cmd": job.shellcmd,
"workflow_workspace": workflow_workspace,
"job_name": job.name,
Expand Down Expand Up @@ -116,22 +120,6 @@ def run(
# it would immediately check if the output files are present
# and fail otherwise (3 sec timeout).

if job.is_norun:
job_id = "all"
# Manually create the jobfinished for the root rule (`all`)
# to mark it as successful.
try:
subprocess.check_output(
f"touch {jobfinished}", shell=True,
)
except subprocess.CalledProcessError as ex:
log.error(
"Error creating `all` jobfinished file (exit code {}):\n{}".format(
ex.returncode, ex.output.decode()
)
)
error_callback(job)
return
with self.lock:
self.active_jobs.append(
REANAClusterJob(
Expand All @@ -145,7 +133,8 @@ def run(
)
)

def _get_container_image(self, job: Job) -> str:
@staticmethod
def _get_container_image(job: Job) -> str:
if job.container_img_url:
container_image = job.container_img_url.replace("docker://", "")
log.info(f"Environment: {container_image}")
Expand All @@ -154,36 +143,79 @@ def _get_container_image(self, job: Job) -> str:
log.info(f"No environment specified, falling back to: {container_image}")
return container_image

def _handle_job_status(self, job, status):
def _handle_job_status(
self, job, job_status: JobStatus, workflow_status: RunStatus
) -> None:
workflow_uuid = os.getenv("workflow_uuid", "default")
job_id = job.reana_job_id
log.info(f"{status} job: {job_id}")
log.info(f"{job_status} job: {job_id}")
message = {
"progress": build_progress_message(
**{status: {"total": 1, "job_ids": [job_id]}}
**{job_status.name: {"total": 1, "job_ids": [job_id]}}
)
}
status_running = 1
status_failed = 3
status_mapping = {"finished": status_running, "failed": status_failed}
self.publisher.publish_workflow_status(
workflow_uuid, status_mapping[status], message=message
workflow_uuid, workflow_status.value, message=message
)

def handle_job_success(self, job):
def handle_job_success(self, job) -> None:
"""Override job success method to publish job status."""
# override handle_touch = True, to enable `touch()` in Snakefiles
super(ClusterExecutor, self).handle_job_success(
job, upload_remote=False, handle_log=False, handle_touch=True
job, upload_remote=False, handle_log=False, handle_touch=False
)

self._handle_job_status(job, "finished")
self._handle_job_status(
job, job_status=JobStatus.finished, workflow_status=RunStatus.running
)

def handle_job_error(self, job):
def handle_job_error(self, job) -> None:
"""Override job error method to publish job status."""
super().handle_job_error(job)

self._handle_job_status(job, "failed")
self._handle_job_status(
job, job_status=JobStatus.failed, workflow_status=RunStatus.failed
)

def _get_job_status_from_controller(self, job_id: str) -> str:
response = self.rjc_api_client.check_status(job_id)
return response["status"]

def _wait_for_jobs(self):
"""Override _wait_for_jobs method to poll job-controller for job statuses.
Original _wait_for_jobs method was checking .jobfinished or .jobfailed files.
"""
while True:
with self.lock:
if not self.wait:
return
active_jobs = self.active_jobs
self.active_jobs = []
still_running = []

for active_job in active_jobs:
job_id = active_job.job.reana_job_id

try:
status = self._get_job_status_from_controller(job_id)
except Exception as error:
log.error(
f"Error getting status of job with id {job_id}. Details: {error}"
)
status = JobStatus.failed

if status == JobStatus.finished or active_job.job.is_norun:
active_job.callback(active_job.job)
elif status == JobStatus.failed:
active_job.error_callback(active_job.job)
else:
still_running.append(active_job)

with self.lock:
self.active_jobs = still_running

time.sleep(POLL_JOBS_STATUS_SLEEP_IN_SECONDS)


def submit_job(rjc_api_client, publisher, job_request_body):
Expand All @@ -199,10 +231,6 @@ def submit_job(rjc_api_client, publisher, job_request_body):
)
return job_id

# FIXME: Call `job_status = poll_job_status(rjc_api_client, job_id)` instead of
# checking job success/failure via `jobfinished`/`jobfailed` files in .snakemake?
# In that case we would probably need to implement our own `_wait_for_jobs` method.


def run_jobs(
rjc_api_client,
Expand Down

0 comments on commit aef617c

Please sign in to comment.