Skip to content

Commit

Permalink
Add support for job custom requirements purge (#3508)
Browse files Browse the repository at this point in the history
  • Loading branch information
tcibinan committed May 10, 2024
1 parent f0d8645 commit cc70407
Show file tree
Hide file tree
Showing 11 changed files with 354 additions and 87 deletions.
12 changes: 8 additions & 4 deletions workflows/pipe-common/pipeline/hpc/autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ def select_hosts_to_scale_down(self, hosts):

class GridEngineAutoscaler:

def __init__(self, grid_engine, job_validator, demand_selector,
def __init__(self, grid_engine, job_preprocessor, job_validator, demand_selector,
cmd_executor, scale_up_orchestrator, scale_down_orchestrator, host_storage,
static_host_storage, scale_up_timeout, scale_down_timeout, max_additional_hosts, idle_timeout=30,
clock=Clock()):
Expand All @@ -452,6 +452,7 @@ def __init__(self, grid_engine, job_validator, demand_selector,
and there were no new jobs for the given time interval.
:param grid_engine: Grid engine.
:param job_preprocessor: Job preprocessor.
:param job_validator: Job validator.
:param demand_selector: Demand selector.
:param cmd_executor: Cmd executor.
Expand All @@ -468,8 +469,9 @@ def __init__(self, grid_engine, job_validator, demand_selector,
:param idle_timeout: Maximum number of seconds a host could wait for a new job before getting scaled-down.
"""
self.grid_engine = grid_engine
self.demand_selector = demand_selector
self.job_preprocessor = job_preprocessor
self.job_validator = job_validator
self.demand_selector = demand_selector
self.executor = cmd_executor
self.scale_up_orchestrator = scale_up_orchestrator
self.scale_down_orchestrator = scale_down_orchestrator
Expand Down Expand Up @@ -545,8 +547,10 @@ def scale(self):
Logger.info('Done: Scaling.')

def _get_valid_jobs(self, jobs):
Logger.info('Validating %s jobs...' % len(jobs))
valid_jobs, invalid_jobs = self.job_validator.validate(jobs)
Logger.info('Preprocessing %s jobs...' % len(jobs))
relevant_jobs, _ = self.job_preprocessor.process(jobs)
Logger.info('Validating %s jobs...' % len(relevant_jobs))
valid_jobs, invalid_jobs = self.job_validator.validate(relevant_jobs)
if invalid_jobs:
Logger.warn('The following jobs cannot be satisfied with the requested resources '
'and therefore will be killed: #{}'
Expand Down
13 changes: 13 additions & 0 deletions workflows/pipe-common/pipeline/hpc/engine/gridengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,19 @@ def validate(self, jobs):
pass


class GridEngineJobProcessor:

def process(self, jobs):
pass


class DoNothingGridEngineJobProcessor(GridEngineJobProcessor):

def process(self, jobs):
relevant_jobs, irrelevant_jobs = jobs, []
return relevant_jobs, irrelevant_jobs


class GridEngineLaunchAdapter:

def get_worker_init_task_name(self):
Expand Down
183 changes: 128 additions & 55 deletions workflows/pipe-common/pipeline/hpc/engine/sge.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import functools
import os
from xml.etree import ElementTree

import math
Expand All @@ -8,7 +7,8 @@

from pipeline.hpc.cmd import ExecutionError
from pipeline.hpc.engine.gridengine import GridEngine, GridEngineJobState, GridEngineJob, AllocationRule, \
GridEngineType, _perform_command, GridEngineDemandSelector, GridEngineJobValidator, GridEngineLaunchAdapter
GridEngineType, _perform_command, GridEngineDemandSelector, GridEngineJobValidator, GridEngineLaunchAdapter, \
GridEngineJobProcessor
from pipeline.hpc.logger import Logger
from pipeline.hpc.resource import IntegralDemand, ResourceSupply, FractionalDemand, CustomResourceSupply, \
CustomResourceDemand
Expand All @@ -32,16 +32,16 @@ class SunGridEngine(GridEngine):
_KILL_JOBS = 'qdel %s'
_FORCE_KILL_JOBS = 'qdel -f %s'

def __init__(self, cmd_executor, queue, hostlist, queue_default):
def __init__(self, cmd_executor, queue, hostlist, queue_default,
gpu_resource_name, mem_resource_name, exc_resource_name):
self.cmd_executor = cmd_executor
self.queue = queue
self.hostlist = hostlist
self.queue_default = queue_default
self.tmp_queue_name_attribute = 'tmp_queue_name'
# todo: Move to script init function
self.gpu_resource_name = os.getenv('CP_CAP_GE_CONSUMABLE_RESOURCE_NAME_GPU', 'gpus')
self.mem_resource_name = os.getenv('CP_CAP_GE_CONSUMABLE_RESOURCE_NAME_RAM', 'ram')
self.exc_resource_name = os.getenv('CP_CAP_GE_CONSUMABLE_RESOURCE_NAME_EXCLUSIVE', 'exclusive')
self.gpu_resource_name = gpu_resource_name
self.mem_resource_name = mem_resource_name
self.exc_resource_name = exc_resource_name
self.job_state_to_codes = {
GridEngineJobState.RUNNING: ['r', 't', 'Rr', 'Rt'],
GridEngineJobState.PENDING: ['qw', 'qw', 'hqw', 'hqw', 'hRwq', 'hRwq', 'hRwq', 'qw', 'qw'],
Expand Down Expand Up @@ -103,45 +103,40 @@ def get_jobs(self):
job_exc = 0
job_requests = {}
hard_requests = job_list.findall('hard_request')
for hard_request in hard_requests:
hard_request_name = hard_request.get('name', '').strip()
if hard_request_name == self.gpu_resource_name:
job_gpu_raw = hard_request.text or '0'
for request in hard_requests:
request_name = request.get('name', '').strip()
request_value = request.text or ''
if not request_name or not request_value:
Logger.warn('Job #{job_id} by {job_user} has partial requirement: {name}={value}'
.format(job_id=root_job_id, job_user=job_user,
name=request_name or '?', value=request_value or '?'))
continue
if request_name == self.gpu_resource_name:
try:
job_gpu = int(job_gpu_raw)
job_gpu = self._parse_int(request_value)
except ValueError:
Logger.warn('Job #{job_id} by {job_user} has invalid requirement: {name}={value}'
.format(job_id=root_job_id, job_user=job_user,
name='gpu', value=job_gpu_raw),
name='gpu', value=request_value),
trace=True)
elif hard_request_name == self.mem_resource_name:
job_mem_raw = hard_request.text or '0G'
elif request_name == self.mem_resource_name:
try:
job_mem = self._parse_mem(job_mem_raw)
job_mem = self._parse_mem(request_value)
except Exception:
Logger.warn('Job #{job_id} by {job_user} has invalid requirement: {name}={value}'
.format(job_id=root_job_id, job_user=job_user,
name='mem', value=job_mem_raw),
name='mem', value=request_value),
trace=True)
elif hard_request_name == self.exc_resource_name:
job_exc_raw = hard_request.text or 'false'
elif request_name == self.exc_resource_name:
try:
job_exc = int(self._parse_bool(job_exc_raw))
job_exc = int(self._parse_bool(request_value))
except Exception:
Logger.warn('Job #{job_id} by {job_user} has invalid requirement: {name}={value}'
.format(job_id=root_job_id, job_user=job_user,
name='exc', value=job_exc_raw),
name='exc', value=request_value),
trace=True)
elif hard_request_name:
job_request_name = hard_request_name
job_request_raw = hard_request.text or '0'
try:
job_request = int(job_request_raw)
job_requests[job_request_name] = job_request
except ValueError:
Logger.warn('Job #{job_id} by {job_user} has unsupported requirement: {name}={value}'
.format(job_id=root_job_id, job_user=job_user,
name=job_request_name, value=job_request_raw))
else:
job_requests[request_name] = request_value
for job_id in job_ids:
if job_id in jobs:
job = jobs[job_id]
Expand Down Expand Up @@ -238,11 +233,11 @@ def _get_global_resources(self):
root = ElementTree.fromstring(output)
for host in root.findall('host'):
for resource in host.findall('resourcevalue'):
resource_name = resource.get('name') or ''
resource_value_raw = resource.text or ''
try:
resource_value = int(resource_value_raw.split('.', 1)[0])
except ValueError:
resource_name = resource.get('name', '').strip()
resource_value = resource.text or ''
if not resource_name or not resource_value:
Logger.warn('Global has partial resource: {name}={value}'
.format(name=resource_name or '?', value=resource_value or '?'))
continue
yield resource_name, resource_value

Expand All @@ -254,31 +249,33 @@ def get_host_supplies(self):
host_gpu = 0
host_mem = 0
host_exc = 0
for host_resource in host.findall('resourcevalue'):
host_resource_name = host_resource.get('name', '').strip()
if host_resource_name == self.gpu_resource_name:
host_gpu_raw = host_resource.text or '0'
for resource in host.findall('resourcevalue'):
resource_name = resource.get('name', '').strip()
resource_value = resource.text or ''
if not resource_name or not resource_value:
Logger.warn('Host {host_name} has partial resource: {name}={value}'
.format(host_name=host_name, name=resource_name or '?', value=resource_value or '?'))
continue
if resource_name == self.gpu_resource_name:
try:
host_gpu = self._parse_int(host_gpu_raw)
host_gpu = self._parse_int(resource_value)
except ValueError:
Logger.warn('Host {host_name} has invalid resource: {name}={value}'
.format(host_name=host_name, name='gpu', value=host_gpu_raw),
.format(host_name=host_name, name='gpu', value=resource_value),
trace=True)
elif host_resource_name == self.mem_resource_name:
host_mem_raw = host_resource.text or '0G'
elif resource_name == self.mem_resource_name:
try:
host_mem = self._parse_mem(host_mem_raw)
host_mem = self._parse_mem(resource_value)
except Exception:
Logger.warn('Host {host_name} has invalid resource: {name}={value}'
.format(host_name=host_name, name='mem', value=host_mem_raw),
.format(host_name=host_name, name='mem', value=resource_value),
trace=True)
elif host_resource_name == self.exc_resource_name:
host_exc_raw = host_resource.text or '0'
elif resource_name == self.exc_resource_name:
try:
host_exc = self._parse_int(host_exc_raw)
host_exc = self._parse_int(resource_value)
except Exception:
Logger.warn('Host {host_name} has invalid resource: {name}={value}'
.format(host_name=host_name, name='exc', value=host_exc_raw),
.format(host_name=host_name, name='exc', value=resource_value),
trace=True)
for queue in host.findall('queue[@name=\'%s\']' % self.queue):
host_slots = int(queue.find('queuevalue[@name=\'slots\']').text or '0')
Expand Down Expand Up @@ -374,7 +371,7 @@ def _as_resources_str(self, demand, supply):
for key in ['cpu', 'gpu', 'mem', 'exc'])


class SunGridEngineCustomDemandSelector(GridEngineDemandSelector):
class SunGridEngineGlobalDemandSelector(GridEngineDemandSelector):

def __init__(self, inner, grid_engine):
self._inner = inner
Expand All @@ -384,21 +381,51 @@ def select(self, jobs):
return self._inner.select(list(self.filter(jobs)))

def filter(self, jobs):
initial_supply = functools.reduce(operator.add, self._grid_engine.get_global_supplies(), CustomResourceSupply())
initial_supplies = map(self._get_int_supply, self._grid_engine.get_global_supplies())
initial_supply = functools.reduce(operator.add, initial_supplies, CustomResourceSupply())
for job in sorted(jobs, key=lambda job: job.root_id):
initial_demand = CustomResourceDemand(values={key: value for key, value in job.requests.items()
if key in initial_supply.values.keys()})
initial_demand = self._get_job_int_demand(job, keys=initial_supply.values.keys())
remaining_demand, remaining_supply = initial_demand.subtract(initial_supply)
if remaining_demand:
Logger.warn('Ignoring job #{job_id} {job_name} by {job_user} because '
'it requires custom resources which are not available at the moment: '
'it requires global resources which are not available at the moment: '
'{job_resources}...'
.format(job_id=job.id, job_name=job.name, job_user=job.user,
job_resources=self._as_resources_str(initial_demand, initial_supply)))
continue
initial_supply = remaining_supply
yield job

def _get_job_int_demand(self, job, keys):
return CustomResourceDemand(values=dict(self._get_job_int_requests(job, keys)))

def _get_job_int_requests(self, job, keys):
for request_name, request_value in job.requests.items():
if request_name not in keys:
continue
try:
yield request_name, self._parse_int(request_value)
except ValueError:
Logger.warn('Job #{job_id} by {job_user} has unsupported requirement: {name}={value}'
.format(job_id=job.root_id, job_user=job.user,
name=request_name, value=request_value),
trace=True)

def _get_int_supply(self, supply):
return CustomResourceSupply(values=dict(self._get_int_resources(supply)))

def _get_int_resources(self, supply):
for resource_name, resource_value in supply.values.items():
try:
yield resource_name, self._parse_int(resource_value)
except ValueError:
Logger.warn('Global has unsupported resource: {name}={value}'
.format(name=resource_name, value=resource_value),
trace=True)

def _parse_int(self, value):
return int(float(value))

def _as_resources_str(self, custom_demand, custom_supply):
return ', '.join('{demand}/{supply} {name}'
.format(name=key,
Expand Down Expand Up @@ -517,6 +544,52 @@ def validate(self, jobs):
return valid_jobs, invalid_jobs


class SunGridEngineCustomRequestsPurgeJobProcessor(GridEngineJobProcessor):

def __init__(self, cmd_executor, gpu_resource_name, mem_resource_name, exc_resource_name, dry_run):
self._cmd_executor = cmd_executor
self._gpu_resource_name = gpu_resource_name
self._mem_resource_name = mem_resource_name
self._exc_resource_name = exc_resource_name
self._default_resource_names = [self._gpu_resource_name,
self._mem_resource_name,
self._exc_resource_name]
self._dry_run = dry_run
self._cmd = 'qalter {job_id} -l {job_requests}'

def process(self, jobs):
relevant_jobs, irrelevant_jobs = [], []
for job in jobs:
if job.root_id != job.id:
relevant_jobs.append(job)
continue
if all(request_name in self._default_resource_names for request_name in job.requests):
relevant_jobs.append(job)
continue
try:
Logger.info('Purging job #{} custom requirements...'.format(job.id))
self._purge_custom_requests(job)
irrelevant_jobs.append(job)
except Exception:
Logger.warn('Job #{} custom requirements purge has failed'.format(job.id), crucial=True, trace=True)
relevant_jobs.append(job)
return relevant_jobs, irrelevant_jobs

def _purge_custom_requests(self, job):
if self._dry_run:
return
job_default_requests = {}
if job.gpu:
job_default_requests[self._gpu_resource_name] = str(job.gpu)
if job.mem:
job_default_requests[self._mem_resource_name] = str(job.mem) + 'G'
if job.exc:
job_default_requests[self._exc_resource_name] = str(bool(job.exc)).lower()
self._cmd_executor.execute(self._cmd.format(
job_id=job.root_id,
job_requests=','.join('{}={}'.format(k, v) for k, v in job_default_requests.items())))


class SunGridEngineLaunchAdapter(GridEngineLaunchAdapter):

def __init__(self, queue, hostlist):
Expand Down
12 changes: 12 additions & 0 deletions workflows/pipe-common/pipeline/hpc/param.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,9 @@ def __init__(self):
self.custom_requirements = GridEngineParameter(
name='CP_CAP_AUTOSCALE_CUSTOM_REQUIREMENTS', type=PARAM_BOOL, default=True,
help='Enables custom requirements processing.')
self.custom_requirements_purge = GridEngineParameter(
name='CP_CAP_AUTOSCALE_CUSTOM_REQUIREMENTS_PURGE', type=PARAM_BOOL, default=False,
help='Enables custom requirements purge.')
self.grid_engine = GridEngineParameter(
name='CP_CAP_AUTOSCALE_GRID_ENGINE', type=PARAM_STR, default=None,
help='Specifies grid engine type.\n'
Expand Down Expand Up @@ -295,6 +298,15 @@ def __init__(self):
self.master_cores = GridEngineParameter(
name='CP_CAP_SGE_MASTER_CORES', type=PARAM_INT, default=None,
help='Specifies a number of available cores on a cluster manager.')
self.gpu_resource_name = GridEngineParameter(
name='CP_CAP_GE_CONSUMABLE_RESOURCE_NAME_GPU', type=PARAM_STR, default='gpus',
help='Specifies gpu resource name.')
self.mem_resource_name = GridEngineParameter(
name='CP_CAP_GE_CONSUMABLE_RESOURCE_NAME_RAM', type=PARAM_STR, default='ram',
help='Specifies ram resource name.')
self.exc_resource_name = GridEngineParameter(
name='CP_CAP_GE_CONSUMABLE_RESOURCE_NAME_EXCLUSIVE', type=PARAM_STR, default='exclusive',
help='Specifies exclusive resource name.')
self.sge_grid_engine = GridEngineParameter(
name='CP_CAP_SGE', type=PARAM_BOOL, default=False,
help='Enables SGE grid engine.')
Expand Down
Loading

0 comments on commit cc70407

Please sign in to comment.