Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

'simple scheduler': remove dependency on bcftbx 'Job' class #1008

Open
wants to merge 3 commits into
base: devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
254 changes: 200 additions & 54 deletions auto_process_ngs/simple_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python
#
# simple_scheduler.py: provide basic scheduler capability
# Copyright (C) University of Manchester 2013-2022 Peter Briggs
# Copyright (C) University of Manchester 2013-2024 Peter Briggs
#
########################################################################
#
Expand All @@ -10,17 +10,16 @@
#########################################################################


"""Python module to provide scheduler capability for running external
"""
Python module to provide scheduler capability for running external
programs

"""

#######################################################################
# Import modules that this module depends on
#######################################################################

import bcftbx.JobRunner as JobRunner
from bcftbx.Pipeline import Job
import time
import os
import sys
Expand Down Expand Up @@ -170,7 +169,7 @@ def stop(self):
logging.debug("\t#%d (%s): \"%s\" (%s)" % (
job.job_number,
job.job_id,
job.name,
job.job_name,
date_and_time(job.start_time)))
job.terminate()

Expand Down Expand Up @@ -312,7 +311,7 @@ def wait_for(self,names,timeout=None):
for job in self.__running:
print("\t#%d (%s): \"%s\" (%s)" % (job.job_number,
job.job_id,
job.name,
job.job_name,
date_and_time(job.start_time)))
job.terminate()
print("Finished")
Expand Down Expand Up @@ -573,7 +572,9 @@ def run(self):
while not self.__submitted.empty():
job = self.__submitted.get()
self.__scheduled.append(job)
logging.debug("Added job #%d (%s): \"%s\"" % (job.job_number,job.name,job))
logging.debug("Added job #%d (%s): \"%s\"" % (job.job_number,
job.name,
job))
remaining_jobs = []
if self.__submission_mode == SubmissionMode.RANDOM:
# Randomise the order of the scheduled jobs
Expand Down Expand Up @@ -800,59 +801,136 @@ def wait(self,poll_interval=5):
logging.debug("Group '%s' (#%s) finished" % (self.group_name,
self.group_id))

class SchedulerJob(Job):
"""Class providing an interface to scheduled jobs
class SchedulerJob:
"""
Class providing an interface to scheduled jobs

Set up a job by creating a new instance specifying the job
runner and command. Optionally name, job number, working
directory and log directory can also be specified, and the
job can be forced to wait for one or more other jobs to
complete.

The job is started by invoking the 'start' method; its
status can be checked with the 'is_running' method, and
terminated and restarted using the 'terminate' and 'restart'
methods respectively.

Information about the job can also be accessed via its
properties:

job_number Externally assigned job number
job_name Name assigned to job
job_id Id for the job assigned by the runner
command Command to run
args Arguments to supply to the command
working_dir Working directory to run jobs in
log_dir Directory to write log files to
log Log (stdout) file for the job
err Error log (stderr) file for the job
name Alias for 'job_name'
start_time Start time (seconds since the epoch)
end_time End time (seconds since the epoch)
exit_status Exit code from the command that was run (integer)

The Job class uses a JobRunner instance (which supplies the
necessary methods for starting, stopping and monitoring) for
low-level job interactions.

SchedulerJob instances should normally be returned by a
call to the 'submit' method of a SimpleScheduler object.

Arguments:
runner (JobRunner): a JobRunner instance supplying job control
methods
args (list): command and arguments to run
job_number (int): assigns external job number (optional)
name (str): explicitly assigns job name (optional, default
is to use the command)
working_dir (str): directory to run the script in (optional,
defaults to CWD)
log_dir (str): directory to write log files to (optional)
wait_for (list): list of SchedulerJobs that must complete
before this one can start
"""

def __init__(self,runner,args,job_number=None,name=None,working_dir=None,
log_dir=None,wait_for=None):
"""Create a new SchedulerJob instance

"""
def __init__(self, runner, args, job_number=None, name=None,
working_dir=None, log_dir=None, wait_for=None):
self.job_number = job_number
self.job_name = name
self.log_dir = log_dir
if wait_for:
self.waiting_for = list(wait_for)
else:
self.waiting_for = list()
self.command = ' '.join([str(arg) for arg in args])
if name is None:
name = args[0]
self.command = args[0]
self.args = [arg for arg in args[1:]]
if self.job_name is None:
self.job_name = str(self.command)
else:
self.job_name = str(name)
if working_dir is None:
working_dir = os.getcwd()
self.working_dir = os.getcwd()
else:
working_dir = os.path.abspath(working_dir)
Job.__init__(self,runner,name,working_dir,args[0],args[1:])
self.working_dir = os.path.abspath(working_dir)
self.job_id = None
self.log = None
self.err = None
self.submitted = False
self.start_time = None
self.end_time = None
self.exit_status = None
self._runner = runner
self._restarts = 0
self._finished = False
# Time interval to use when checking for job start (seconds)
# Can be floating point number e.g. 0.1 (= 100ms)
self._poll_interval = 1
# Time out period to use before giving up on job submission
# (seconds)
self._timeout = 3600

@property
def name(self):
"""
Alias for 'job_name'
"""
return self.job_name

@property
def runner(self):
"""
Return the associated job runner instance
"""
return self._runner

@property
def is_running(self):
"""Test if a job is running
"""
Check if a job is running

Returns True if the job has started and is still running,
False otherwise.

"""
# Check if job is running
return self.isRunning()
if not self.submitted:
return False
self._update()
return not self._finished

@property
def in_error_state(self):
"""Test if a job is in an error state
"""
Test if a job is in an error state

Returns True if the job appears to be in an error state,
and False if not.
"""
return self.errorState()
return self.runner.errorState(self.job_id)

@property
def completed(self):
"""Test if a job has completed
"""
Test if a job has completed

Returns True if the job has finished running, False
otherwise.
Expand All @@ -863,32 +941,68 @@ def completed(self):

@property
def exit_code(self):
"""Return exit code from job
"""
Return exit code from job

Wrapper for the 'exit_status' property provided by the
'Job' superclass.
Returns the integer exit code from the job if it has
finished running, None otherwise.
"""
return self.exit_status

def start(self):
"""Start the job running

Overrides the 'start' method in the base 'Job' class.
"""
Start the job running

Returns:
Id for job

"""
if self.log_dir is not None:
# Explicitly reset the log directory in the runner
runner_log_dir = self.runner.log_dir
self.runner.set_log_dir(self.log_dir)
job_id = Job.start(self)
if not self.submitted and not self._finished:
self.job_id = self.runner.run(self.job_name,
self.working_dir,
self.command,
self.args)
self.submitted = True
self.start_time = time.time()
if self.job_id is None:
# Failed to submit correctly
logging.warning(f"{self.job_name}: job submission failed")
self.failed = True
self._finished = True
self.end_time = self.start_time
return
self.submitted = True
self.start_time = time.time()
self.log = self.runner.logFile(self.job_id)
self.err = self.runner.errFile(self.job_id)
# Wait for evidence that the job has started
logging.debug(f"Job {self.job_id}: waiting for job to start")
time_waiting = 0
while not self.runner.isRunning(self.job_id) and \
not os.path.exists(self.log):
time.sleep(self._poll_interval)
time_waiting += self._poll_interval
if time_waiting > self._timeout:
# Waited too long for job to start, give up
logging.error(f"Job {self.job_id}: timed out waiting "
"for job to start")
self.failed = True
self._finished = True
self._update()
return self.job_id
logging.debug(f"Job {self.job_id} started "
f"({time.asctime(time.localtime(self.start_time))})")
if self.log_dir is not None:
# Restore original log directory for runner
self.runner.set_log_dir(runner_log_dir)
return job_id
return self.job_id

def wait(self,poll_interval=5,timeout=None):
"""Wait for the job to complete
"""
Wait for the job to complete

This method blocks while waiting for the job to finish
running.
Expand All @@ -906,7 +1020,6 @@ def wait(self,poll_interval=5,timeout=None):
in seconds that the job will be allowed to run before
it's terminated and a SchedulerTimeout exception
is raised

"""
logging.debug("Waiting for job #%s (%s)..." % (self.job_number,
self.job_id))
Expand All @@ -924,8 +1037,18 @@ def wait(self,poll_interval=5,timeout=None):
wait_time += poll_interval
logging.debug("Job #%s finished" % self.job_number)

def terminate(self):
"""
Terminate a running job
"""
if self.is_running:
self.runner.terminate(self.job_id)
self.terminated = True
self._update()

def restart(self,max_tries=3):
"""Restart running the job
"""
Restart a running job

Attempts to restart a job, by terminating the current
instance and reinvoking the 'start' method.
Expand All @@ -948,31 +1071,54 @@ def restart(self,max_tries=3):
"""
# Check if job can be restarted
if self.completed:
logging.debug("Job #%s: already completed, cannot "
"restart" % self.job_number)
logging.debug(f"Job {self.job_number}: already completed, "
"cannot be restarted")
return False
# Terminate the job
logging.debug("Job #%s: terminating before resubmission"
% self.job_number)
self.terminate()
logging.debug(f"Job {self.job_number}: terminating...")
if self.is_running:
self.terminate()
while self.is_running:
logging.debug("Job {self.job_number}: waiting for job to "
"stop")
time.sleep(self._poll_interval)
# Check if we can restart
self._restarts += 1
if self._restarts > max_tries:
logging.debug("Job #%s: maximum number of restart "
"attempts exceeded (%d)" %
(self.job_number,max_tries))
logging.debug(f"Job {self.job_number}: maximum number of "
f"restart attempts exceeded (max_tries)")
return False
else:
logging.debug("Job #%s: restarted (attempt #%d)" %
(self.job_number,self._restarts))
#return self.start()
return Job.restart(self)
logging.debug(f"Job {self.job_number}: restarted (attempt "
f"#{self._restarts})")
# Reset flags
self._finished = False
self.submitted = False
self.terminated = False
self.start_time = None
self.end_time = None
self.exit_status = None
# Resubmit
return self.start()

def _update(self):
"""
Internal: check and update status of job

def __repr__(self):
"""Return string representation of the job command line
This method checks if a job has completed and
updates the internal parameters accordingly.
"""
if not self._finished:
if not self.runner.isRunning(self.job_id):
self._finished = True
self.end_time = time.time()
self.exit_status = self.runner.exit_status(self.job_id)

def __repr__(self):
"""
Return string representation of the job command line
"""
return ' '.join([str(x) for x in ([self.script,] + self.args)])
return ' '.join([str(x) for x in ([self.command] + self.args)])

class SchedulerCallback:
"""Class providing an interface to scheduled callbacks
Expand Down
Loading