Skip to content

Commit

Permalink
Implement asyncio schedulers
Browse files Browse the repository at this point in the history
  • Loading branch information
Blanca-Fuentes committed Dec 16, 2024
1 parent ca0c3c2 commit 010b8fb
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 32 deletions.
11 changes: 6 additions & 5 deletions reframe/core/schedulers/flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
# Lawrence Livermore National Lab
#

import asyncio
import itertools
import os
import time
Expand Down Expand Up @@ -73,7 +74,7 @@ def emit_preamble(self, job):
def make_job(self, *args, **kwargs):
return _FluxJob(*args, **kwargs)

def submit(self, job):
async def submit(self, job):
'''Submit a job to the flux executor.'''

flux_future = self._fexecutor.submit(job.fluxjob)
Expand All @@ -89,7 +90,7 @@ def cancel(self, job):
# This will raise JobException with event=cancel (on poll)
flux.job.cancel(flux.Flux(), job._flux_future.jobid())

def poll(self, *jobs):
async def poll(self, *jobs):
'''Poll running Flux jobs for updated states.'''

if jobs:
Expand Down Expand Up @@ -141,13 +142,13 @@ def filternodes(self, job, nodes):
'flux backend does not support node filtering'
)

def wait(self, job):
async def wait(self, job):
'''Wait until a job is finished.'''

intervals = itertools.cycle([1, 2, 3])
while not self.finished(job):
self.poll(job)
time.sleep(next(intervals))
await self.poll(job)
await asyncio.sleep(next(intervals))

def finished(self, job):
if job.exception:
Expand Down
11 changes: 6 additions & 5 deletions reframe/core/schedulers/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
from reframe.core.exceptions import JobSchedulerError
from reframe.core.schedulers.pbs import PbsJobScheduler

_run_strict = functools.partial(osext.run_command, check=True)
# Asynchronous _run_strict
_run_strict = functools.partial(osext.run_command_asyncio, check=True)


@register_scheduler('lsf')
Expand Down Expand Up @@ -78,14 +79,14 @@ def emit_preamble(self, job):
# Filter out empty statements before returning
return list(filter(None, preamble))

def submit(self, job):
async def submit(self, job):
with open(job.script_filename, 'r') as fp:
cmd_parts = ['bsub']
if self._sched_access_in_submit:
cmd_parts += job.sched_access

cmd = ' '.join(cmd_parts)
completed = _run_strict(cmd, stdin=fp)
completed = await _run_strict(cmd, stdin=fp)

jobid_match = re.search(r'^Job <(?P<jobid>\S+)> is submitted',
completed.stdout)
Expand All @@ -95,15 +96,15 @@ def submit(self, job):
job._jobid = jobid_match.group('jobid')
job._submit_time = time.time()

def poll(self, *jobs):
async def poll(self, *jobs):
if jobs:
# filter out non-jobs
jobs = [job for job in jobs if job is not None]

if not jobs:
return

completed = _run_strict(
completed = await _run_strict(
'bjobs -o "jobid: user:10 stat: queue:" -noheader '
f'{" ".join(job.jobid for job in jobs)}'
)
Expand Down
18 changes: 11 additions & 7 deletions reframe/core/schedulers/oar.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ def oar_state_pending(state):
return False


_run_strict = functools.partial(osext.run_command, check=True)
# Asynchronous _run_strict
_run_strict = functools.partial(osext.run_command_asyncio, check=True)
# Synchronous _run_strict
_run_strict_s = functools.partial(osext.run_command, check=True)


@register_scheduler('oar')
Expand Down Expand Up @@ -104,7 +107,7 @@ def emit_preamble(self, job):

return preamble

def submit(self, job):
async def submit(self, job):
# OAR batch submission mode needs full path to the job script
job_script_fullpath = os.path.join(job.workdir, job.script_filename)
cmd_parts = ['oarsub']
Expand All @@ -114,7 +117,7 @@ def submit(self, job):
# OAR needs -S to submit job in batch mode
cmd_parts += ['-S', job_script_fullpath]
cmd = ' '.join(cmd_parts)
completed = _run_strict(cmd, timeout=self._submit_timeout)
completed = await _run_strict(cmd, timeout=self._submit_timeout)
jobid_match = re.search(r'.*OAR_JOB_ID=(?P<jobid>\S+)',
completed.stdout)
if not jobid_match:
Expand All @@ -125,10 +128,10 @@ def submit(self, job):
job._submit_time = time.time()

def cancel(self, job):
_run_strict(f'oardel {job.jobid}', timeout=self._submit_timeout)
_run_strict_s(f'oardel {job.jobid}', timeout=self._submit_timeout)
job._cancelled = True

def poll(self, *jobs):
async def poll(self, *jobs):
if jobs:
# Filter out non-jobs
jobs = [job for job in jobs if job is not None]
Expand All @@ -137,7 +140,7 @@ def poll(self, *jobs):
return

for job in jobs:
completed = _run_strict(
completed = await _run_strict(
f'oarstat -fj {job.jobid}'
)

Expand All @@ -154,7 +157,8 @@ def poll(self, *jobs):
# https://github.com/oar-team/oar/blob/37db5384c7827cca2d334e5248172bb700015434/sources/core/qfunctions/oarstat#L332
job_raw_info = completed.stdout
jobid_match = re.search(
r'^(Job_Id|id):\s*(?P<jobid>\S+)', completed.stdout, re.MULTILINE
r'^(Job_Id|id):\s*(?P<jobid>\S+)', completed.stdout,
re.MULTILINE
)
if jobid_match:
jobid = jobid_match.group('jobid')
Expand Down
22 changes: 13 additions & 9 deletions reframe/core/schedulers/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
# - Initial version submitted by Rafael Escovar, ASML
#

import asyncio
import functools
import os
import itertools
Expand All @@ -35,7 +36,10 @@
PBS_CANCEL_DELAY = 3


_run_strict = functools.partial(osext.run_command, check=True)
# Asynchronous _run_strict
_run_strict = functools.partial(osext.run_command_asyncio, check=True)
# Synchronous _run_strict
_run_strict_s = functools.partial(osext.run_command, check=True)


JOB_STATES = {
Expand Down Expand Up @@ -146,7 +150,7 @@ def filternodes(self, job, nodes):
raise NotImplementedError('pbs backend does not support '
'node filtering')

def submit(self, job):
async def submit(self, job):
cmd_parts = ['qsub']
if self._sched_access_in_submit:
cmd_parts += job.sched_access
Expand All @@ -155,7 +159,7 @@ def submit(self, job):
# Slurm wrappers.
cmd_parts += ['-o', job.stdout, '-e', job.stderr, job.script_filename]
cmd = ' '.join(cmd_parts)
completed = _run_strict(cmd, timeout=self._submit_timeout)
completed = await _run_strict(cmd, timeout=self._submit_timeout)
jobid_match = re.search(r'^(?P<jobid>\S+)', completed.stdout)
if not jobid_match:
raise JobSchedulerError('could not retrieve the job id '
Expand All @@ -164,18 +168,18 @@ def submit(self, job):
job._jobid = jobid_match.group('jobid')
job._submit_time = time.time()

def wait(self, job):
async def wait(self, job):
intervals = itertools.cycle([1, 2, 3])
while not self.finished(job):
self.poll(job)
time.sleep(next(intervals))
await self.poll(job)
await asyncio.sleep(next(intervals))

def cancel(self, job):
time_from_submit = time.time() - job.submit_time
if time_from_submit < PBS_CANCEL_DELAY:
time.sleep(PBS_CANCEL_DELAY - time_from_submit)

_run_strict(f'qdel {job.jobid}', timeout=self._submit_timeout)
_run_strict_s(f'qdel {job.jobid}', timeout=self._submit_timeout)
job._cancelled = True

def finished(self, job):
Expand Down Expand Up @@ -205,7 +209,7 @@ def _query_exit_code(self, job):

return None

def poll(self, *jobs):
async def poll(self, *jobs):
def output_ready(job):
# We report a job as finished only when its stdout/stderr are
# written back to the working directory
Expand All @@ -220,7 +224,7 @@ def output_ready(job):
if not jobs:
return

completed = osext.run_command(
completed = await osext.run_command_asyncio(
f'qstat -f {" ".join(job.jobid for job in jobs)}'
)

Expand Down
13 changes: 8 additions & 5 deletions reframe/core/schedulers/sge.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
from reframe.core.schedulers.pbs import PbsJobScheduler
from reframe.utility import seconds_to_hms

_run_strict = functools.partial(osext.run_command, check=True)
# Asynchronous _run_strict
_run_strict = functools.partial(osext.run_command_asyncio, check=True)
# Synchronous _run_strict
_run_strict_s = functools.partial(osext.run_command, check=True)


@register_scheduler('sge')
Expand Down Expand Up @@ -53,11 +56,11 @@ def emit_preamble(self, job):

return preamble

def submit(self, job):
async def submit(self, job):
# `-o` and `-e` options are only recognized in command line by the PBS,
# SGE, and Slurm wrappers.
cmd = f'qsub -o {job.stdout} -e {job.stderr} {job.script_filename}'
completed = _run_strict(cmd, timeout=self._submit_timeout)
completed = await _run_strict(cmd, timeout=self._submit_timeout)
jobid_match = re.search(r'^Your job (?P<jobid>\S+)', completed.stdout)
if not jobid_match:
raise JobSchedulerError('could not retrieve the job id '
Expand All @@ -66,7 +69,7 @@ def submit(self, job):
job._jobid = jobid_match.group('jobid')
job._submit_time = time.time()

def poll(self, *jobs):
async def poll(self, *jobs):
if jobs:
# Filter out non-jobs
jobs = [job for job in jobs if job is not None]
Expand All @@ -75,7 +78,7 @@ def poll(self, *jobs):
return

user = osext.osuser()
completed = osext.run_command(f'qstat -xml -u {user}')
completed = await osext.run_command_asyncio(f'qstat -xml -u {user}')
if completed.returncode != 0:
raise JobSchedulerError(
f'qstat failed with exit code {completed.returncode} '
Expand Down
2 changes: 1 addition & 1 deletion reframe/core/schedulers/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ async def poll(self, *jobs):
time_from_last_submit = time.time() - m
rem_wait = self.SQUEUE_DELAY - time_from_last_submit
if rem_wait > 0:
time.sleep(rem_wait)
await asyncio.sleep(rem_wait)

# We don't run the command with check=True, because if the job has
# finished already, squeue might return an error about an invalid
Expand Down

0 comments on commit 010b8fb

Please sign in to comment.