diff --git a/auto_process_ngs/simple_scheduler.py b/auto_process_ngs/simple_scheduler.py index 96afab828..27edc6bf1 100644 --- a/auto_process_ngs/simple_scheduler.py +++ b/auto_process_ngs/simple_scheduler.py @@ -1,7 +1,7 @@ #!/usr/bin/env python # # simple_scheduler.py: provide basic scheduler capability -# Copyright (C) University of Manchester 2013-2022 Peter Briggs +# Copyright (C) University of Manchester 2013-2024 Peter Briggs # ######################################################################## # @@ -10,9 +10,9 @@ ######################################################################### -"""Python module to provide scheduler capability for running external +""" +Python module to provide scheduler capability for running external programs - """ ####################################################################### @@ -20,7 +20,6 @@ ####################################################################### import bcftbx.JobRunner as JobRunner -from bcftbx.Pipeline import Job import time import os import sys @@ -170,7 +169,7 @@ def stop(self): logging.debug("\t#%d (%s): \"%s\" (%s)" % ( job.job_number, job.job_id, - job.name, + job.job_name, date_and_time(job.start_time))) job.terminate() @@ -312,7 +311,7 @@ def wait_for(self,names,timeout=None): for job in self.__running: print("\t#%d (%s): \"%s\" (%s)" % (job.job_number, job.job_id, - job.name, + job.job_name, date_and_time(job.start_time))) job.terminate() print("Finished") @@ -573,7 +572,9 @@ def run(self): while not self.__submitted.empty(): job = self.__submitted.get() self.__scheduled.append(job) - logging.debug("Added job #%d (%s): \"%s\"" % (job.job_number,job.name,job)) + logging.debug("Added job #%d (%s): \"%s\"" % (job.job_number, + job.name, + job)) remaining_jobs = [] if self.__submission_mode == SubmissionMode.RANDOM: # Randomise the order of the scheduled jobs @@ -800,19 +801,60 @@ def wait(self,poll_interval=5): logging.debug("Group '%s' (#%s) finished" % (self.group_name, self.group_id)) -class SchedulerJob(Job): - """Class providing an interface to scheduled jobs +class SchedulerJob: + """ + Class providing an interface to scheduled jobs + + Set up a job by creating a new instance specifying the job + runner and command. Optionally name, job number, working + directory and log directory can also be specified, and the + job can be forced to wait for one or more other jobs to + complete. + + The job is started by invoking the 'start' method; its + status can be checked with the 'is_running' method, and + terminated and restarted using the 'terminate' and 'restart' + methods respectively. + + Information about the job can also be accessed via its + properties: + + job_number Externally assigned job number + job_name Name assigned to job + job_id Id for the job assigned by the runner + command Command to run + args Arguments to supply to the command + working_dir Working directory to run jobs in + log_dir Directory to write log files to + log Log (stdout) file for the job + err Error log (stderr) file for the job + name Alias for 'job_name' + start_time Start time (seconds since the epoch) + end_time End time (seconds since the epoch) + exit_status Exit code from the command that was run (integer) + + The Job class uses a JobRunner instance (which supplies the + necessary methods for starting, stopping and monitoring) for + low-level job interactions. SchedulerJob instances should normally be returned by a call to the 'submit' method of a SimpleScheduler object. + Arguments: + runner (JobRunner): a JobRunner instance supplying job control + methods + args (list): command and arguments to run + job_number (int): assigns external job number (optional) + name (str): explicitly assigns job name (optional, default + is to use the command) + working_dir (str): directory to run the script in (optional, + defaults to CWD) + log_dir (str): directory to write log files to (optional) + wait_for (list): list of SchedulerJobs that must complete + before this one can start """ - - def __init__(self,runner,args,job_number=None,name=None,working_dir=None, - log_dir=None,wait_for=None): - """Create a new SchedulerJob instance - - """ + def __init__(self, runner, args, job_number=None, name=None, + working_dir=None, log_dir=None, wait_for=None): self.job_number = job_number self.job_name = name self.log_dir = log_dir @@ -820,39 +862,75 @@ def __init__(self,runner,args,job_number=None,name=None,working_dir=None, self.waiting_for = list(wait_for) else: self.waiting_for = list() - self.command = ' '.join([str(arg) for arg in args]) - if name is None: - name = args[0] + self.command = args[0] + self.args = [arg for arg in args[1:]] + if self.job_name is None: + self.job_name = str(self.command) + else: + self.job_name = str(name) if working_dir is None: - working_dir = os.getcwd() + self.working_dir = os.getcwd() else: - working_dir = os.path.abspath(working_dir) - Job.__init__(self,runner,name,working_dir,args[0],args[1:]) + self.working_dir = os.path.abspath(working_dir) + self.job_id = None + self.log = None + self.err = None + self.submitted = False + self.start_time = None + self.end_time = None + self.exit_status = None + self._runner = runner self._restarts = 0 + self._finished = False + # Time interval to use when checking for job start (seconds) + # Can be floating point number e.g. 0.1 (= 100ms) + self._poll_interval = 1 + # Time out period to use before giving up on job submission + # (seconds) + self._timeout = 3600 + + @property + def name(self): + """ + Alias for 'job_name' + """ + return self.job_name + + @property + def runner(self): + """ + Return the associated job runner instance + """ + return self._runner @property def is_running(self): - """Test if a job is running + """ + Check if a job is running Returns True if the job has started and is still running, False otherwise. - """ # Check if job is running - return self.isRunning() + if not self.submitted: + return False + self._update() + return not self._finished @property def in_error_state(self): - """Test if a job is in an error state + """ + Test if a job is in an error state Returns True if the job appears to be in an error state, and False if not. """ - return self.errorState() + return self.runner.errorState(self.job_id) @property def completed(self): - """Test if a job has completed + """ + Test if a job has completed Returns True if the job has finished running, False otherwise. @@ -863,32 +941,68 @@ def completed(self): @property def exit_code(self): - """Return exit code from job + """ + Return exit code from job - Wrapper for the 'exit_status' property provided by the - 'Job' superclass. + Returns the integer exit code from the job if it has + finished running, None otherwise. """ return self.exit_status def start(self): - """Start the job running - - Overrides the 'start' method in the base 'Job' class. + """ + Start the job running Returns: Id for job - """ if self.log_dir is not None: + # Explicitly reset the log directory in the runner runner_log_dir = self.runner.log_dir self.runner.set_log_dir(self.log_dir) - job_id = Job.start(self) + if not self.submitted and not self._finished: + self.job_id = self.runner.run(self.job_name, + self.working_dir, + self.command, + self.args) + self.submitted = True + self.start_time = time.time() + if self.job_id is None: + # Failed to submit correctly + logging.warning(f"{self.job_name}: job submission failed") + self.failed = True + self._finished = True + self.end_time = self.start_time + return + self.submitted = True + self.start_time = time.time() + self.log = self.runner.logFile(self.job_id) + self.err = self.runner.errFile(self.job_id) + # Wait for evidence that the job has started + logging.debug(f"Job {self.job_id}: waiting for job to start") + time_waiting = 0 + while not self.runner.isRunning(self.job_id) and \ + not os.path.exists(self.log): + time.sleep(self._poll_interval) + time_waiting += self._poll_interval + if time_waiting > self._timeout: + # Waited too long for job to start, give up + logging.error(f"Job {self.job_id}: timed out waiting " + "for job to start") + self.failed = True + self._finished = True + self._update() + return self.job_id + logging.debug(f"Job {self.job_id} started " + f"({time.asctime(time.localtime(self.start_time))})") if self.log_dir is not None: + # Restore original log directory for runner self.runner.set_log_dir(runner_log_dir) - return job_id + return self.job_id def wait(self,poll_interval=5,timeout=None): - """Wait for the job to complete + """ + Wait for the job to complete This method blocks while waiting for the job to finish running. @@ -906,7 +1020,6 @@ def wait(self,poll_interval=5,timeout=None): in seconds that the job will be allowed to run before it's terminated and a SchedulerTimeout exception is raised - """ logging.debug("Waiting for job #%s (%s)..." % (self.job_number, self.job_id)) @@ -924,8 +1037,18 @@ def wait(self,poll_interval=5,timeout=None): wait_time += poll_interval logging.debug("Job #%s finished" % self.job_number) + def terminate(self): + """ + Terminate a running job + """ + if self.is_running: + self.runner.terminate(self.job_id) + self.terminated = True + self._update() + def restart(self,max_tries=3): - """Restart running the job + """ + Restart a running job Attempts to restart a job, by terminating the current instance and reinvoking the 'start' method. @@ -948,31 +1071,54 @@ def restart(self,max_tries=3): """ # Check if job can be restarted if self.completed: - logging.debug("Job #%s: already completed, cannot " - "restart" % self.job_number) + logging.debug(f"Job {self.job_number}: already completed, " + "cannot be restarted") return False # Terminate the job - logging.debug("Job #%s: terminating before resubmission" - % self.job_number) - self.terminate() + logging.debug(f"Job {self.job_number}: terminating...") + if self.is_running: + self.terminate() + while self.is_running: + logging.debug("Job {self.job_number}: waiting for job to " + "stop") + time.sleep(self._poll_interval) # Check if we can restart self._restarts += 1 if self._restarts > max_tries: - logging.debug("Job #%s: maximum number of restart " - "attempts exceeded (%d)" % - (self.job_number,max_tries)) + logging.debug(f"Job {self.job_number}: maximum number of " + f"restart attempts exceeded (max_tries)") return False else: - logging.debug("Job #%s: restarted (attempt #%d)" % - (self.job_number,self._restarts)) - #return self.start() - return Job.restart(self) + logging.debug(f"Job {self.job_number}: restarted (attempt " + f"#{self._restarts})") + # Reset flags + self._finished = False + self.submitted = False + self.terminated = False + self.start_time = None + self.end_time = None + self.exit_status = None + # Resubmit + return self.start() + + def _update(self): + """ + Internal: check and update status of job - def __repr__(self): - """Return string representation of the job command line + This method checks if a job has completed and + updates the internal parameters accordingly. + """ + if not self._finished: + if not self.runner.isRunning(self.job_id): + self._finished = True + self.end_time = time.time() + self.exit_status = self.runner.exit_status(self.job_id) + def __repr__(self): + """ + Return string representation of the job command line """ - return ' '.join([str(x) for x in ([self.script,] + self.args)]) + return ' '.join([str(x) for x in ([self.command] + self.args)]) class SchedulerCallback: """Class providing an interface to scheduled callbacks diff --git a/auto_process_ngs/test/test_simple_scheduler.py b/auto_process_ngs/test/test_simple_scheduler.py index 86170dc8f..ae4d212f3 100644 --- a/auto_process_ngs/test/test_simple_scheduler.py +++ b/auto_process_ngs/test/test_simple_scheduler.py @@ -24,6 +24,8 @@ def __init__(self,nslots=1): self.__log_dirs = dict() self.__error_states = dict() self.__nslots = nslots + self.__log_files = dict() + self.__exit_status = dict() BaseJobRunner.__init__(self) def run(self,name,working_dir,script,args): @@ -32,8 +34,11 @@ def run(self,name,working_dir,script,args): self.__jobs[job_id] = { 'name': name, 'working_dir': working_dir, 'script': script, - 'args': args } + 'args': args, } self.__log_dirs[job_id] = self.log_dir + self.__log_files[job_id] ="%s.%s.log" % (self.__jobs[job_id]['name'], + job_id) + self.__exit_status[job_id] = None return job_id @property @@ -41,14 +46,25 @@ def nslots(self): return self.__nslots def logFile(self,job_id): - return "%s.%s.log" % (self.__jobs[job_id]['name'],job_id) + if job_id in self.__log_files: + log_dir = self.__log_dirs[job_id] + if not log_dir: + log_dir = os.getcwd() + return os.path.join(log_dir, self.__log_files[job_id]) + return None def errFile(self,job_id): return self.logFile(job_id) + def exit_status(self,job_id): + if job_id in self.__exit_status: + return self.__exit_status[job_id] + return None + def terminate(self,job_id): if job_id in self.__jobs: del(self.__jobs[job_id]) + del(self.__exit_status[job_id]) def errorState(self,job_id): try: @@ -59,6 +75,12 @@ def errorState(self,job_id): def list(self): return self.__jobs.keys() + def set_job_completed(self,job_id,exit_status=0): + # Simulate job completion + if job_id in self.__jobs: + del(self.__jobs[job_id]) + self.__exit_status[job_id] = exit_status + def set_error_state(self,job_id,state): # Allow error state on jobs to be set manually for testing self.__error_states[job_id] = state @@ -937,48 +959,104 @@ def tearDown(self): shutil.rmtree(self.log_dir) def test_scheduler_job_basic(self): - """Basic test of SchedulerJob + """ + SchedulerJob: test running basic job + """ + runner = MockJobRunner() + job = SchedulerJob(runner, ['sleep','50']) + self.assertEqual(job.job_name, "sleep") + self.assertEqual(job.job_number,None) + self.assertEqual(job.job_id, None) + self.assertEqual(job.log_dir, None) + self.assertEqual(job.command, "sleep") + self.assertEqual(job.args, ['50']) + self.assertEqual(job.runner, runner) + self.assertEqual(job.exit_status, None) + self.assertEqual(str(job), "sleep 50") + self.assertFalse(job.is_running) + self.assertEqual(job.log, None) + self.assertEqual(job.err, None) + self.assertFalse(job.completed) + self.assertEqual(job.exit_status, None) + # Start the job + job.start() + self.assertNotEqual(job.job_id, None) + self.assertTrue(job.is_running) + self.assertEqual(job.log, runner.logFile(job.job_id)) + self.assertEqual(job.err, runner.logFile(job.job_id)) + self.assertFalse(job.completed) + self.assertEqual(job.exit_status, None) + # Simulate job completion (no error) + runner.set_job_completed(job.job_id) + self.assertFalse(job.is_running) + self.assertTrue(job.completed) + self.assertEqual(job.exit_status, 0) + def test_scheduler_job_fails(self): """ - job = SchedulerJob(MockJobRunner(),['sleep','50']) - self.assertEqual(job.job_name,None) + SchedulerJob: test failing job + """ + runner = MockJobRunner() + job = SchedulerJob(runner, ['ls','*.whereisit']) + self.assertEqual(job.job_name, "ls") self.assertEqual(job.job_number,None) - self.assertEqual(job.log_dir,None) - self.assertEqual(job.command,"sleep 50") + self.assertEqual(job.log_dir, None) + self.assertEqual(job.log, None) + self.assertEqual(job.err, None) + self.assertEqual(job.command,"ls") + self.assertEqual(job.args, ['*.whereisit']) + self.assertEqual(str(job), "ls *.whereisit") self.assertFalse(job.is_running) self.assertFalse(job.completed) + # Start the job job.start() + self.assertNotEqual(job.job_id, None) self.assertTrue(job.is_running) + self.assertEqual(job.log, runner.logFile(job.job_id)) + self.assertEqual(job.err, runner.logFile(job.job_id)) self.assertFalse(job.completed) - job.terminate() + # Simulate job completion with error + runner.set_job_completed(job.job_id, exit_status=1) self.assertFalse(job.is_running) self.assertTrue(job.completed) + self.assertEqual(job.exit_status, 1) def test_scheduler_job_set_log_dir(self): - """Set explicit log_dir for SchedulerJob - + """ + SchedulerJob: explicitly set log directory for job """ runner = MockJobRunner() self.assertEqual(runner.log_dir,None) job = SchedulerJob(runner,['sleep','50'],log_dir='/logs') - self.assertEqual(job.job_name,None) + self.assertEqual(job.job_name, "sleep") self.assertEqual(job.job_number,None) self.assertEqual(job.log_dir,'/logs') - self.assertEqual(job.command,"sleep 50") + self.assertEqual(job.log, None) + self.assertEqual(job.err, None) + self.assertEqual(job.command,"sleep") + self.assertEqual(job.args, ['50']) + self.assertEqual(str(job), "sleep 50") self.assertFalse(job.is_running) self.assertFalse(job.completed) self.assertEqual(runner.log_dir,None) + # Start the job job.start() - self.assertEqual(job.log_dir,'/logs') self.assertTrue(job.is_running) + self.assertEqual(job.log_dir, '/logs') + self.assertEqual(job.log, runner.logFile(job.job_id)) + self.assertEqual(job.err, runner.logFile(job.job_id)) + self.assertEqual(os.path.dirname(job.log), "/logs") + self.assertEqual(os.path.dirname(job.err), "/logs") self.assertFalse(job.completed) self.assertEqual(runner.log_dir,None) + # Terminate the job job.terminate() self.assertFalse(job.is_running) self.assertTrue(job.completed) def test_scheduler_job_wait(self): - """Wait for SchedulerJob to complete + """ + SchedulerJob: wait for running job to complete """ self.log_dir = tempfile.mkdtemp() job = SchedulerJob( @@ -993,7 +1071,8 @@ def test_scheduler_job_wait(self): self.assertTrue(job.completed) def test_submitted_scheduler_job_wait(self): - """Wait for submitted SchedulerJob to complete + """ + SchedulerJob: wait for submitted job to complete """ self.log_dir = tempfile.mkdtemp() sched = SimpleScheduler( @@ -1009,7 +1088,8 @@ def test_submitted_scheduler_job_wait(self): self.assertTrue(job.completed) def test_scheduler_job_wait_timeout_raises_exception(self): - """SchedulerJob raises exception if 'wait' timeout exceeded + """ + SchedulerJob: raise exception if 'wait' timeout exceeded """ self.log_dir = tempfile.mkdtemp() job = SchedulerJob( @@ -1022,13 +1102,16 @@ def test_scheduler_job_wait_timeout_raises_exception(self): timeout=1) def test_restart_scheduler_job(self): - """Restart running SchedulerJob + """ + SchedulerJob: restart running job """ job = SchedulerJob(MockJobRunner(),['sleep','50']) - self.assertEqual(job.job_name,None) + self.assertEqual(job.job_name, "sleep") self.assertEqual(job.job_number,None) self.assertEqual(job.log_dir,None) - self.assertEqual(job.command,"sleep 50") + self.assertEqual(job.command,"sleep") + self.assertEqual(job.args, ['50']) + self.assertEqual(str(job), "sleep 50") self.assertFalse(job.is_running) self.assertFalse(job.completed) initial_job_id = job.start() @@ -1044,13 +1127,16 @@ def test_restart_scheduler_job(self): self.assertTrue(job.completed) def test_cant_restart_completed_scheduler_job(self): - """Can't restart a completed SchedulerJob + """ + SchedulerJob: don't restart a completed job """ job = SchedulerJob(MockJobRunner(),['sleep','50']) - self.assertEqual(job.job_name,None) + self.assertEqual(job.job_name, "sleep") self.assertEqual(job.job_number,None) self.assertEqual(job.log_dir,None) - self.assertEqual(job.command,"sleep 50") + self.assertEqual(job.command, "sleep") + self.assertEqual(job.args, ['50']) + self.assertEqual(str(job), "sleep 50") job.start() job.terminate() self.assertFalse(job.is_running) @@ -1061,13 +1147,16 @@ def test_cant_restart_completed_scheduler_job(self): self.assertTrue(job.completed) def test_restart_scheduler_job_exceed_max_tries(self): - """Restart running SchedulerJob fails if max attempts exceeded + """ + SchedulerJob: restart fails if maximum attempts exceeded """ job = SchedulerJob(MockJobRunner(),['sleep','50']) - self.assertEqual(job.job_name,None) + self.assertEqual(job.job_name, "sleep") self.assertEqual(job.job_number,None) self.assertEqual(job.log_dir,None) - self.assertEqual(job.command,"sleep 50") + self.assertEqual(job.command, "sleep") + self.assertEqual(job.args, ['50']) + self.assertEqual(str(job), "sleep 50") self.assertFalse(job.is_running) self.assertFalse(job.completed) job_id = job.start()