From e2c1efe5934060f1c491a4f61febd92fcb79cc3b Mon Sep 17 00:00:00 2001 From: Andrey Tsibin Date: Fri, 29 Dec 2023 15:47:55 +0200 Subject: [PATCH] Support kubernetes in grid engine autoscaler (#3418) * Support kubernetes in grid engine autoscaler --- .../pipe-common/pipeline/hpc/autoscaler.py | 17 +- .../pipeline/hpc/engine/gridengine.py | 65 +++- .../pipe-common/pipeline/hpc/engine/kube.py | 310 ++++++++++++++++++ .../pipe-common/pipeline/hpc/engine/sge.py | 38 ++- .../pipe-common/pipeline/hpc/engine/slurm.py | 34 +- workflows/pipe-common/pipeline/hpc/param.py | 20 +- .../scripts/autoscale_grid_engine.py | 50 +-- .../scripts/manage_sge_profiles.py | 2 +- workflows/pipe-common/shell/kube_setup_master | 9 + .../test/test_kube_demand_selector_default.py | 120 +++++++ .../pipe-common/test/test_kube_grid_engine.py | 281 ++++++++++++++++ .../test/test_kube_resource_parser.py | 66 ++++ .../pipe-common/test/test_scale_up_handler.py | 2 + .../pipe-common/test/test_sge_grid_engine.py | 8 +- .../test/test_slurm_grid_engine.py | 10 +- 15 files changed, 964 insertions(+), 68 deletions(-) create mode 100644 workflows/pipe-common/pipeline/hpc/engine/kube.py create mode 100644 workflows/pipe-common/test/test_kube_demand_selector_default.py create mode 100644 workflows/pipe-common/test/test_kube_grid_engine.py create mode 100644 workflows/pipe-common/test/test_kube_resource_parser.py diff --git a/workflows/pipe-common/pipeline/hpc/autoscaler.py b/workflows/pipe-common/pipeline/hpc/autoscaler.py index ba8886c372..dc483c0322 100644 --- a/workflows/pipe-common/pipeline/hpc/autoscaler.py +++ b/workflows/pipe-common/pipeline/hpc/autoscaler.py @@ -22,7 +22,7 @@ import itertools import time -from pipeline.hpc.engine.gridengine import GridEngineJobState, GridEngineType +from pipeline.hpc.engine.gridengine import GridEngineJobState from pipeline.hpc.logger import Logger from pipeline.hpc.resource import IntegralDemand from pipeline.hpc.utils import Clock @@ -129,7 +129,7 @@ class GridEngineScaleUpHandler: _GE_POLL_TIMEOUT = 60 _GE_POLL_ATTEMPTS = 6 - def __init__(self, cmd_executor, api, grid_engine, host_storage, parent_run_id, instance_disk, + def __init__(self, cmd_executor, api, grid_engine, launch_adapter, host_storage, parent_run_id, instance_disk, instance_image, cmd_template, price_type, region_id, queue, hostlist, owner_param_name, polling_timeout=_POLL_TIMEOUT, polling_delay=_POLL_DELAY, ge_polling_timeout=_GE_POLL_TIMEOUT, instance_launch_params=None, clock=Clock()): @@ -141,6 +141,7 @@ def __init__(self, cmd_executor, api, grid_engine, host_storage, parent_run_id, :param cmd_executor: Cmd executor. :param api: Cloud pipeline client. :param grid_engine: Grid engine client. + :param launch_adapter: Grid engine launch adapter. :param host_storage: Additional hosts storage. :param parent_run_id: Additional nodes parent run id. :param instance_disk: Additional nodes disk size. @@ -159,6 +160,7 @@ def __init__(self, cmd_executor, api, grid_engine, host_storage, parent_run_id, self.executor = cmd_executor self.api = api self.grid_engine = grid_engine + self.launch_adapter = launch_adapter self.host_storage = host_storage self.parent_run_id = parent_run_id self.instance_disk = instance_disk @@ -293,8 +295,9 @@ def _await_worker_initialization(self, run_id): if run['initialized']: Logger.info('Additional worker #%s has been marked as initialized.' % run_id) Logger.info('Checking additional worker #%s grid engine initialization status...' % run_id) - run_sge_tasks = self.api.load_task(run_id, self.get_grid_engine_worker_task_name()) - if any(run_sge_task.get('status') == 'SUCCESS' for run_sge_task in run_sge_tasks): + run_grid_engine_tasks = self.api.load_task(run_id, self.launch_adapter.get_worker_init_task_name()) + if any(run_grid_engine_task.get('status') == 'SUCCESS' + for run_grid_engine_task in run_grid_engine_tasks): Logger.info('Additional worker #%s has been initialized.' % run_id) return Logger.info('Additional worker #%s hasn\'t been initialized yet. Only %s attempts remain left.' @@ -305,12 +308,6 @@ def _await_worker_initialization(self, run_id): Logger.warn(error_msg, crucial=True) raise ScalingError(error_msg) - def get_grid_engine_worker_task_name(self): - if self.grid_engine.get_engine_type() == GridEngineType.SLURM: - return 'SLURMWorkerSetup' - else: - return 'SGEWorkerSetup' - def _enable_worker_in_grid_engine(self, pod): Logger.info('Enabling additional worker %s in grid engine...' % pod.name) attempts = self.ge_polling_timeout / self.polling_delay if self.polling_delay \ diff --git a/workflows/pipe-common/pipeline/hpc/engine/gridengine.py b/workflows/pipe-common/pipeline/hpc/engine/gridengine.py index 3a94d4ed8a..75e9d9028e 100644 --- a/workflows/pipe-common/pipeline/hpc/engine/gridengine.py +++ b/workflows/pipe-common/pipeline/hpc/engine/gridengine.py @@ -12,6 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import math +from datetime import datetime + from pipeline.hpc.logger import Logger @@ -19,7 +22,7 @@ def _perform_command(action, msg, error_msg, skip_on_failure): Logger.info(msg) try: action() - except RuntimeError as e: + except Exception as e: Logger.warn(error_msg) if not skip_on_failure: raise RuntimeError(error_msg, e) @@ -29,6 +32,7 @@ class GridEngineType: SGE = "SGE" SLURM = "SLURM" + KUBE = "KUBE" def __init__(self): pass @@ -83,21 +87,11 @@ class GridEngineJobState: COMPLETED = 'completed' UNKNOWN = 'unknown' - _letter_codes_to_states = { - # Job statuses: [SGE] + [SLURM] - RUNNING: ['r', 't', 'Rr', 'Rt'] + ['RUNNING'], - PENDING: ['qw', 'qw', 'hqw', 'hqw', 'hRwq', 'hRwq', 'hRwq', 'qw', 'qw'] + ['PENDING'], - SUSPENDED: ['s', 'ts', 'S', 'tS', 'T', 'tT', 'Rs', 'Rts', 'RS', 'RtS', 'RT', 'RtT'] + ['SUSPENDED', 'STOPPED'], - ERROR: ['Eqw', 'Ehqw', 'EhRqw'] + ['DEADLINE', ' FAILED'], - DELETED: ['dr', 'dt', 'dRr', 'dRt', 'ds', 'dS', 'dT', 'dRs', 'dRS', 'dRT'] + ['DELETED', 'CANCELLED'], - COMPLETED: [] + ['COMPLETED', 'COMPLETING'] - } - @staticmethod - def from_letter_code(code): - for key in GridEngineJobState._letter_codes_to_states: - if code in GridEngineJobState._letter_codes_to_states[key]: - return key + def from_letter_code(code, state_to_codes): + for state, codes in state_to_codes.items(): + if code in codes: + return state return GridEngineJobState.UNKNOWN @@ -206,3 +200,44 @@ class GridEngineJobValidator: def validate(self, jobs): pass + + +class GridEngineLaunchAdapter: + + def get_worker_init_task_name(self): + pass + + def get_worker_launch_params(self): + pass + + +class GridEngineResourceParser: + + def __init__(self, datatime_format, cpu_unit=None, cpu_modifiers=None, mem_unit=None, mem_modifiers=None): + self._datetime_format = datatime_format + self._cpu_unit = cpu_unit + self._cpu_modifiers = cpu_modifiers or {} + self._mem_unit = mem_unit + self._mem_modifiers = mem_modifiers or {} + + def parse_date(self, timestamp): + return datetime.strptime(timestamp, self._datetime_format) + + def parse_cpu(self, quantity): + return self._parse_resource(quantity, modifiers=self._cpu_modifiers, unit=self._cpu_unit) + + def parse_mem(self, quantity): + return self._parse_resource(quantity, modifiers=self._mem_modifiers, unit=self._mem_unit) + + def _parse_resource(self, quantity, modifiers, unit): + if not quantity: + return 0 + if len(quantity) > 1 and quantity[-2:] in modifiers: + value, value_unit = int(quantity[:-2]), quantity[-2:] + elif len(quantity) > 0 and quantity[-1:] in modifiers: + value, value_unit = int(quantity[:-1]), quantity[-1:] + else: + value, value_unit = int(quantity), '' + modifier = modifiers[value_unit] + output = float(value * modifier) + return int(math.ceil(output / modifiers.get(unit, 1))) diff --git a/workflows/pipe-common/pipeline/hpc/engine/kube.py b/workflows/pipe-common/pipeline/hpc/engine/kube.py new file mode 100644 index 0000000000..79c8e5634e --- /dev/null +++ b/workflows/pipe-common/pipeline/hpc/engine/kube.py @@ -0,0 +1,310 @@ +# Copyright 2017-2023 EPAM Systems, Inc. (https://www.epam.com/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import functools +import os + +import operator +import pykube + +from pipeline.hpc.engine.gridengine import GridEngine, GridEngineDemandSelector, GridEngineJobValidator, \ + AllocationRuleParsingError, GridEngineType, GridEngineJob, GridEngineJobState, _perform_command, \ + GridEngineLaunchAdapter, GridEngineResourceParser +from pipeline.hpc.logger import Logger +from pipeline.hpc.resource import IntegralDemand, ResourceSupply + + +def get_kube_client(): + try: + return pykube.HTTPClient(pykube.KubeConfig.from_service_account()) + except Exception: + kube_config_path = os.path.join(os.path.expanduser('~'), '.kube', 'config') + return pykube.HTTPClient(pykube.KubeConfig.from_file(kube_config_path)) + + +class KubeGridEngine(GridEngine): + + def __init__(self, kube, resource_parser, owner): + self._kube = kube + self._resource_parser = resource_parser + self._owner = owner + self._job_state_to_codes = { + GridEngineJobState.RUNNING: ['Running'], + GridEngineJobState.PENDING: ['Pending'], + GridEngineJobState.SUSPENDED: [], + GridEngineJobState.ERROR: ['Failed'], + GridEngineJobState.DELETED: [], + GridEngineJobState.COMPLETED: ['Succeeded'], + GridEngineJobState.UNKNOWN: ['Unknown'] + } + + def get_engine_type(self): + return GridEngineType.KUBE + + def get_jobs(self): + try: + return list(self._get_jobs()) + except Exception: + Logger.warn('Grid engine jobs listing has failed.') + return [] + + def _get_jobs(self): + for pod in pykube.Pod.objects(self._kube): + try: + yield self._get_job(pod) + except Exception: + Logger.warn('Ignoring job {job_name} because its processing has failed...' + .format(job_name=pod.name), + trace=True) + + def _get_job(self, pod): + job_root_id = pod.obj.get('metadata', {}).get('uid', '-') + job_id = job_root_id + job_name = pod.name + job_user = self._owner + job_state = GridEngineJobState.from_letter_code(pod.obj.get('status', {}).get('phase'), + self._job_state_to_codes) + job_datetime = self._resource_parser.parse_date(pod.obj.get('metadata', {}).get('creationTimestamp') + or pod.obj.get('status', {}).get('startTime')) + job_host = pod.obj.get('spec', {}).get('nodeName') + job_hosts = [job_host] if job_host else [] + pod_demand = self._get_pod_demand(pod) + return GridEngineJob( + id=job_id, + root_id=job_root_id, + name=job_name, + user=job_user, + state=job_state, + datetime=job_datetime, + hosts=job_hosts, + cpu=pod_demand.cpu, + gpu=pod_demand.gpu, + mem=pod_demand.mem + ) + + def disable_host(self, host): + pykube.Node.objects(self._kube).get_by_name(host).cordon() + + def enable_host(self, host): + pykube.Node.objects(self._kube).get_by_name(host).uncordon() + + def get_pe_allocation_rule(self, pe): + raise AllocationRuleParsingError('Kube grid engine does not support parallel environments.') + + def delete_host(self, host, skip_on_failure=False): + _perform_command( + action=lambda: pykube.Node.objects(self._kube).get_by_name(host).delete(), + msg='Remove host from GE.', + error_msg='Removing host from GE has failed.', + skip_on_failure=skip_on_failure + ) + + def get_host_supplies(self): + for node in pykube.Node.objects(self._kube): + node_taints = node.obj.get('spec', {}).get('taints', []) + if any(node_taint.get('effect') == 'NoSchedule' for node_taint in node_taints): + continue + yield self._get_node_supply(node) + + def get_host_supply(self, host): + node = pykube.Node.objects(self._kube).get_by_name(host) + return self._get_node_supply(node) + + def _get_node_supply(self, node): + return self._get_node_supply_total(node) - self._get_node_supply_used(node) + + def _get_node_supply_total(self, node): + node_allocatable = node.obj.get('status', {}).get('allocatable', {}) + return ResourceSupply(cpu=self._resource_parser.parse_cpu(node_allocatable.get('cpu', '0'))) + + def _get_node_supply_used(self, node): + pod_demands = list(self._get_pod_demand(pod) for pod in self._get_pods(node)) + return functools.reduce(operator.add, pod_demands, IntegralDemand()) + + def _get_pod_demand(self, pod): + pod_demand = functools.reduce(operator.add, self._get_pod_demands(pod), IntegralDemand()) + if pod_demand.cpu < 1: + return pod_demand + IntegralDemand(cpu=1) + return pod_demand + + def _get_pod_demands(self, pod): + containers = pod.obj.get('spec', {}).get('containers', []) + for container in containers: + container_requests = container.get('resources', {}).get('requests', {}) + container_cpu = 0 + container_cpu_raw = container_requests.get('cpu') or '0' + try: + container_cpu = self._resource_parser.parse_cpu(container_cpu_raw) + except Exception: + Logger.warn('Job {job_name} has invalid requirement: {name}={value}' + .format(job_name=pod.name, name='cpu', value=container_cpu_raw), + trace=True) + container_mem = 0 + container_mem_raw = container_requests.get('memory') or '0' + try: + container_mem = self._resource_parser.parse_mem(container_mem_raw) + except Exception: + Logger.warn('Job {job_name} has invalid requirement: {name}={value}' + .format(job_name=pod.name, name='mem', value=container_mem_raw), + trace=True) + yield IntegralDemand(cpu=container_cpu, + mem=container_mem) + + def _get_pods(self, node): + return pykube.Pod.objects(self._kube).filter(field_selector={'spec.nodeName': node.name}) + + def is_valid(self, host): + try: + node = pykube.Node.objects(self._kube).get_by_name(host) + node_conditions = node.obj.get('status', {}).get('conditions', []) + for node_condition in node_conditions: + node_condition_type = node_condition.get('type') + node_condition_status = node_condition.get('status') + if node_condition_type != 'Ready': + continue + if node_condition_status == 'True': + return True + Logger.warn('Execution host {host} is not ready which makes host invalid' + .format(host=host)) + return False + except Exception: + Logger.warn('Execution host {host} validation has failed'.format(host=host), trace=True) + return False + + def kill_jobs(self, jobs, force=False): + for job in jobs: + self._kill_job(job) + + def _kill_job(self, job): + pykube.Pod.objects(self._kube).get_by_name(job.id).delete() + + +class KubeDefaultDemandSelector(GridEngineDemandSelector): + + def __init__(self, grid_engine): + self.grid_engine = grid_engine + + def select(self, jobs): + initial_supplies = list(self.grid_engine.get_host_supplies()) + for job in sorted(jobs, key=lambda job: job.root_id): + if job.hosts: + Logger.warn('Ignoring job #{job_id} {job_name} by {job_user} because ' + 'it is pending even though ' + 'it is associated with specific hosts: ' + '{job_hosts}...' + .format(job_id=job.id, job_name=job.name, job_user=job.user, + job_hosts=', '.join(job.hosts))) + continue + initial_demand = IntegralDemand(cpu=job.cpu, gpu=job.gpu, mem=job.mem, owner=job.user) + if any(not (initial_demand - initial_supply) for initial_supply in initial_supplies): + initial_supply = functools.reduce(operator.add, initial_supplies, ResourceSupply()) + Logger.warn('Ignoring job #{job_id} {job_name} by {job_user} because ' + 'it is pending even though ' + 'it requires resources which are available at the moment: ' + '{job_resources}...' + .format(job_id=job.id, job_name=job.name, job_user=job.user, + job_resources=self._as_resources_str(initial_demand, + initial_supply))) + continue + + yield initial_demand + + def _as_resources_str(self, demand, supply): + return ', '.join('{demand}/{supply} {name}' + .format(name=key, + demand=getattr(demand, key), + supply=getattr(supply, key)) + for key in ['cpu', 'gpu', 'mem']) + + +class KubeJobValidator(GridEngineJobValidator): + + def __init__(self, grid_engine, instance_max_supply, cluster_max_supply): + self.grid_engine = grid_engine + self.instance_max_supply = instance_max_supply + self.cluster_max_supply = cluster_max_supply + + def validate(self, jobs): + valid_jobs, invalid_jobs = [], [] + for job in jobs: + job_demand = IntegralDemand(cpu=job.cpu, gpu=job.gpu, mem=job.mem) + if job_demand > self.instance_max_supply: + Logger.warn('Invalid job #{job_id} {job_name} by {job_user} requires resources ' + 'which cannot be satisfied by the biggest instance in cluster: ' + '{job_cpu}/{available_cpu} cpu, ' + '{job_gpu}/{available_gpu} gpu, ' + '{job_mem}/{available_mem} mem.' + .format(job_id=job.id, job_name=job.name, job_user=job.user, + job_cpu=job.cpu, available_cpu=self.instance_max_supply.cpu, + job_gpu=job.gpu, available_gpu=self.instance_max_supply.gpu, + job_mem=job.mem, available_mem=self.instance_max_supply.mem), + crucial=True) + invalid_jobs.append(job) + continue + valid_jobs.append(job) + return valid_jobs, invalid_jobs + + +class KubeLaunchAdapter(GridEngineLaunchAdapter): + + def __init__(self): + pass + + def get_worker_init_task_name(self): + return 'KubeWorkerSetup' + + def get_worker_launch_params(self): + return { + 'CP_CAP_KUBE': 'false' + } + + +class KubeResourceParser: + + def __init__(self): + self._inner = GridEngineResourceParser( + datatime_format='%Y-%m-%dT%H:%M:%SZ', + cpu_unit='', + cpu_modifiers={ + 'm': 0.001, + '': 1 + }, + mem_unit='Gi', + mem_modifiers={ + 'm': 0.001, + '': 1, + 'k': 1000, 'M': 1000 ** 2, 'G': 1000 ** 3, 'T': 1000 ** 4, 'P': 1000 ** 5, 'E': 1000 ** 6, + 'Ki': 1024, 'Mi': 1024 ** 2, 'Gi': 1024 ** 3, 'Ti': 1024 ** 4, 'Pi': 1024 ** 5, 'Ei': 1024 ** 6, + }) + + def parse_date(self, timestamp): + return self._inner.parse_date(timestamp) + + def parse_cpu(self, quantity): + return self._inner.parse_cpu(quantity) + + def parse_mem(self, quantity): + return self._inner.parse_mem(quantity) diff --git a/workflows/pipe-common/pipeline/hpc/engine/sge.py b/workflows/pipe-common/pipeline/hpc/engine/sge.py index ad3c999ff8..610e218de1 100644 --- a/workflows/pipe-common/pipeline/hpc/engine/sge.py +++ b/workflows/pipe-common/pipeline/hpc/engine/sge.py @@ -8,7 +8,7 @@ from pipeline.hpc.cmd import ExecutionError from pipeline.hpc.engine.gridengine import GridEngine, GridEngineJobState, GridEngineJob, AllocationRule, \ - GridEngineType, _perform_command, GridEngineDemandSelector, GridEngineJobValidator + GridEngineType, _perform_command, GridEngineDemandSelector, GridEngineJobValidator, GridEngineLaunchAdapter from pipeline.hpc.logger import Logger from pipeline.hpc.resource import IntegralDemand, ResourceSupply, FractionalDemand, CustomResourceSupply, \ CustomResourceDemand @@ -41,6 +41,18 @@ def __init__(self, cmd_executor, queue, hostlist, queue_default): # todo: Move to script init function self.gpu_resource_name = os.getenv('CP_CAP_GE_CONSUMABLE_RESOURCE_NAME_GPU', 'gpus') self.mem_resource_name = os.getenv('CP_CAP_GE_CONSUMABLE_RESOURCE_NAME_RAM', 'ram') + self.job_state_to_codes = { + GridEngineJobState.RUNNING: ['r', 't', 'Rr', 'Rt'], + GridEngineJobState.PENDING: ['qw', 'qw', 'hqw', 'hqw', 'hRwq', 'hRwq', 'hRwq', 'qw', 'qw'], + GridEngineJobState.SUSPENDED: ['s', 'ts', 'S', 'tS', 'T', 'tT', 'Rs', 'Rts', 'RS', 'RtS', 'RT', 'RtT'], + GridEngineJobState.ERROR: ['Eqw', 'Ehqw', 'EhRqw'], + GridEngineJobState.DELETED: ['dr', 'dt', 'dRr', 'dRt', 'ds', 'dS', 'dT', 'dRs', 'dRS', 'dRT'], + GridEngineJobState.COMPLETED: [], + GridEngineJobState.UNKNOWN: [] + } + + def get_engine_type(self): + return GridEngineType.SGE def get_jobs(self): try: @@ -78,7 +90,7 @@ def get_jobs(self): job_ids = ['{}.{}'.format(root_job_id, job_task) for job_task in job_tasks] or [root_job_id] job_name = job_list.findtext('JB_name') job_user = job_list.findtext('JB_owner') - job_state = GridEngineJobState.from_letter_code(job_list.findtext('state')) + job_state = GridEngineJobState.from_letter_code(job_list.findtext('state'), self.job_state_to_codes) job_datetime = self._parse_date( job_list.findtext('JAT_start_time') or job_list.findtext('JB_submission_time')) job_hosts = [job_host] if job_host else [] @@ -160,9 +172,6 @@ def _parse_array(self, array_jobs): return result def _parse_mem(self, mem_request): - """ - See https://linux.die.net/man/1/sge_types - """ if not mem_request: return 0 modifiers = { @@ -231,9 +240,6 @@ def get_host_supply(self, host): return ResourceSupply(cpu=int(line.strip().split()[1])) return ResourceSupply() - def get_engine_type(self): - return GridEngineType.SGE - def _shutdown_execution_host(self, host, skip_on_failure): _perform_command( action=lambda: self.cmd_executor.execute(SunGridEngine._SHUTDOWN_HOST_EXECUTION_DAEMON % host), @@ -413,3 +419,19 @@ def validate(self, jobs): valid_jobs.append(job) return valid_jobs, invalid_jobs + +class SunGridEngineLaunchAdapter(GridEngineLaunchAdapter): + + def __init__(self, queue, hostlist): + self._queue = queue + self._hostlist = hostlist + + def get_worker_init_task_name(self): + return 'SGEWorkerSetup' + + def get_worker_launch_params(self): + return { + 'CP_CAP_SGE': 'false', + 'CP_CAP_SGE_QUEUE_NAME': self._queue, + 'CP_CAP_SGE_HOSTLIST_NAME': self._hostlist + } diff --git a/workflows/pipe-common/pipeline/hpc/engine/slurm.py b/workflows/pipe-common/pipeline/hpc/engine/slurm.py index 075764b912..1b569251e3 100644 --- a/workflows/pipe-common/pipeline/hpc/engine/slurm.py +++ b/workflows/pipe-common/pipeline/hpc/engine/slurm.py @@ -6,7 +6,8 @@ from pipeline.hpc.logger import Logger from pipeline.hpc.resource import IntegralDemand, ResourceSupply from pipeline.hpc.engine.gridengine import GridEngine, GridEngineJobState, GridEngineJob, \ - GridEngineType, _perform_command, GridEngineDemandSelector, GridEngineJobValidator, AllocationRuleParsingError + GridEngineType, _perform_command, GridEngineDemandSelector, GridEngineJobValidator, AllocationRuleParsingError, \ + GridEngineLaunchAdapter class SlurmGridEngine(GridEngine): @@ -25,6 +26,18 @@ class SlurmGridEngine(GridEngine): def __init__(self, cmd_executor): self.cmd_executor = cmd_executor + self.job_state_to_codes = { + GridEngineJobState.RUNNING: ['RUNNING'], + GridEngineJobState.PENDING: ['PENDING'], + GridEngineJobState.SUSPENDED: ['SUSPENDED', 'STOPPED'], + GridEngineJobState.ERROR: ['DEADLINE', ' FAILED'], + GridEngineJobState.DELETED: ['DELETED', 'CANCELLED'], + GridEngineJobState.COMPLETED: ['COMPLETED', 'COMPLETING'], + GridEngineJobState.UNKNOWN: [] + } + + def get_engine_type(self): + return GridEngineType.SLURM def get_jobs(self): try: @@ -71,9 +84,6 @@ def get_host_supply(self, host): - ResourceSupply(cpu=int(node_desc.get("CPUAlloc", "0"))) return ResourceSupply() - def get_engine_type(self): - return GridEngineType.SLURM - def is_valid(self, host): node_state = self._get_host_state(host) for bad_state in SlurmGridEngine._NODE_BAD_STATES: @@ -114,7 +124,7 @@ def _parse_jobs(self, scontrol_jobs_output): num_tasks_str = job_dict.get('NumTasks', '1') num_tasks = int(num_tasks_str) if num_tasks_str.isdigit() else 1 - job_state = GridEngineJobState.from_letter_code(job_dict.get('JobState')) + job_state = GridEngineJobState.from_letter_code(job_dict.get('JobState'), self.job_state_to_codes) if job_state == GridEngineJobState.PENDING: # In certain cases pending job's start date can be estimated start date. # It confuses autoscaler and therefore should be ignored. @@ -271,3 +281,17 @@ def validate(self, jobs): continue valid_jobs.append(job) return valid_jobs, invalid_jobs + + +class SlurmLaunchAdapter(GridEngineLaunchAdapter): + + def __init__(self): + pass + + def get_worker_init_task_name(self): + return 'SLURMWorkerSetup' + + def get_worker_launch_params(self): + return { + 'CP_CAP_SLURM': 'false' + } diff --git a/workflows/pipe-common/pipeline/hpc/param.py b/workflows/pipe-common/pipeline/hpc/param.py index 27495a4862..bdc796d16c 100644 --- a/workflows/pipe-common/pipeline/hpc/param.py +++ b/workflows/pipe-common/pipeline/hpc/param.py @@ -252,6 +252,13 @@ def __init__(self): self.custom_requirements = GridEngineParameter( name='CP_CAP_AUTOSCALE_CUSTOM_REQUIREMENTS', type=PARAM_BOOL, default=True, help='Enables custom requirements processing.') + self.grid_engine = GridEngineParameter( + name='CP_CAP_AUTOSCALE_GRID_ENGINE', type=PARAM_STR, default=None, + help='Specifies grid engine type.\n' + 'Allowed values: SGE, SLURM and KUBE.') + self.polling_delay = GridEngineParameter( + name='CP_CAP_AUTOSCALE_POLLING_DELAY', type=PARAM_INT, default=10, + help='Specifies a polling delay in seconds for grid engine jobs') class GridEngineQueueParameters(GridEngineParametersGroup): @@ -277,12 +284,15 @@ def __init__(self): self.master_cores = GridEngineParameter( name='CP_CAP_SGE_MASTER_CORES', type=PARAM_INT, default=None, help='Specifies a number of available cores on a cluster manager.') - self.sge_selected = GridEngineParameter( - name='CP_CAP_SGE', type=PARAM_BOOL, default=True, - help='Defines if SGE selected as grid engine.') - self.slurm_selected = GridEngineParameter( + self.sge_grid_engine = GridEngineParameter( + name='CP_CAP_SGE', type=PARAM_BOOL, default=False, + help='Enables SGE grid engine.') + self.slurm_grid_engine = GridEngineParameter( name='CP_CAP_SLURM', type=PARAM_BOOL, default=False, - help='Defines if Slurm selected as grid engine.') + help='Enables Slurm grid engine.') + self.kube_grid_engine = GridEngineParameter( + name='CP_CAP_KUBE', type=PARAM_BOOL, default=False, + help='Enables Kubernetes grid engine.') class GridEngineParameters(GridEngineParametersGroup): diff --git a/workflows/pipe-common/scripts/autoscale_grid_engine.py b/workflows/pipe-common/scripts/autoscale_grid_engine.py index 1ffeb6125b..6eec612411 100644 --- a/workflows/pipe-common/scripts/autoscale_grid_engine.py +++ b/workflows/pipe-common/scripts/autoscale_grid_engine.py @@ -17,9 +17,9 @@ import multiprocessing import os import traceback -from datetime import timedelta import sys +from datetime import timedelta from pipeline.hpc.autoscaler import \ GridEngineAutoscalingDaemon, GridEngineAutoscaler, \ @@ -28,11 +28,14 @@ DoNothingAutoscalingDaemon, DoNothingScaleUpHandler, DoNothingScaleDownHandler from pipeline.hpc.cloud import CloudProvider from pipeline.hpc.cmd import CmdExecutor -from pipeline.hpc.event import GridEngineEventManager from pipeline.hpc.engine.gridengine import GridEngineType +from pipeline.hpc.engine.kube import KubeGridEngine, KubeJobValidator, KubeDefaultDemandSelector, KubeLaunchAdapter, \ + KubeResourceParser, get_kube_client from pipeline.hpc.engine.sge import SunGridEngine, SunGridEngineDefaultDemandSelector, SunGridEngineJobValidator, \ - SunGridEngineCustomDemandSelector -from pipeline.hpc.engine.slurm import SlurmGridEngine, SlurmDemandSelector, SlurmJobValidator + SunGridEngineCustomDemandSelector, SunGridEngineLaunchAdapter +from pipeline.hpc.engine.slurm import SlurmGridEngine, SlurmDemandSelector, SlurmJobValidator, \ + SlurmLaunchAdapter +from pipeline.hpc.event import GridEngineEventManager from pipeline.hpc.host import FileSystemHostStorage, ThreadSafeHostStorage from pipeline.hpc.instance.avail import InstanceAvailabilityManager from pipeline.hpc.instance.provider import DefaultInstanceProvider, \ @@ -51,7 +54,7 @@ from pipeline.utils.path import mkdir -def fetch_instance_launch_params(api, master_run_id, grid_engine_type, queue, hostlist): +def fetch_instance_launch_params(api, launch_adapter, master_run_id): parent_run = api.load_run(master_run_id) master_system_params = {param.get('name'): param.get('resolvedValue') for param in parent_run.get('pipelineRunParameters', [])} @@ -66,16 +69,7 @@ def fetch_instance_launch_params(api, master_run_id, grid_engine_type, queue, ho if not param_value: continue launch_params[param_name] = param_value - if grid_engine_type == GridEngineType.SLURM: - launch_params.update({ - 'CP_CAP_SLURM': 'false' - }) - else: - launch_params.update({ - 'CP_CAP_SGE': 'false', - 'CP_CAP_SGE_QUEUE_NAME': queue, - 'CP_CAP_SGE_HOSTLIST_NAME': hostlist - }) + launch_params.update(launch_adapter.get_worker_launch_params()) launch_params.update({ 'CP_CAP_AUTOSCALE': 'false', 'CP_CAP_AUTOSCALE_WORKERS': '0', @@ -124,10 +118,14 @@ def init_static_hosts(default_hostfile, static_host_storage, clock, active_timeo def get_daemon(): params = GridEngineParameters() - grid_engine_type = GridEngineType.SLURM if params.queue.slurm_selected.get() else GridEngineType.SGE + grid_engine_type = params.autoscaling_advanced.grid_engine.get() \ + or (GridEngineType.KUBE if params.queue.kube_grid_engine.get() + else GridEngineType.SLURM if params.queue.slurm_grid_engine.get() + else GridEngineType.SGE) api_url = os.environ['API'] + cluster_owner = os.getenv('OWNER', 'root') cluster_hostfile = os.environ['DEFAULT_HOSTFILE'] cluster_master_run_id = os.environ['RUN_ID'] cluster_master_name = os.getenv('HOSTNAME', 'pipeline-' + str(cluster_master_run_id)) @@ -210,6 +208,8 @@ def get_daemon(): hybrid_instance_family = params.autoscaling.hybrid_instance_family.get() \ or common_utils.extract_family_from_instance_type(instance_cloud_provider, instance_type) + polling_delay = params.autoscaling_advanced.polling_delay.get() + scale_up_strategy = params.autoscaling.scale_up_strategy.get() scale_up_batch_size = params.autoscaling.scale_up_batch_size.get() scale_up_polling_delay = params.autoscaling.scale_up_polling_delay.get() @@ -256,9 +256,6 @@ def get_daemon(): if dry_run: Logger.info('Using dry run mode...') - instance_launch_params = fetch_instance_launch_params(api, cluster_master_run_id, - grid_engine_type, queue_name, queue_hostlist_name) - clock = Clock() # TODO: Git rid of CmdExecutor usage in favor of CloudPipelineExecutor implementation cmd_executor = CmdExecutor() @@ -401,6 +398,15 @@ def get_daemon(): job_validator = SlurmJobValidator(grid_engine=grid_engine, instance_max_supply=biggest_instance_supply, cluster_max_supply=cluster_supply) demand_selector = SlurmDemandSelector(grid_engine=grid_engine) + launch_adapter = SlurmLaunchAdapter() + elif grid_engine_type == GridEngineType.KUBE: + kube_client = get_kube_client() + resource_parser = KubeResourceParser() + grid_engine = KubeGridEngine(kube=kube_client, resource_parser=resource_parser, owner=cluster_owner) + job_validator = KubeJobValidator(grid_engine=grid_engine, instance_max_supply=biggest_instance_supply, + cluster_max_supply=cluster_supply) + demand_selector = KubeDefaultDemandSelector(grid_engine=grid_engine) + launch_adapter = KubeLaunchAdapter() else: grid_engine = SunGridEngine(cmd_executor=cmd_executor, queue=queue_name, hostlist=queue_hostlist_name, queue_default=queue_default) @@ -410,6 +416,7 @@ def get_daemon(): demand_selector = SunGridEngineDefaultDemandSelector(grid_engine=grid_engine) if custom_requirements: demand_selector = SunGridEngineCustomDemandSelector(inner=demand_selector, grid_engine=grid_engine) + launch_adapter = SunGridEngineLaunchAdapter(queue=queue_name, hostlist=queue_hostlist_name) host_storage = FileSystemHostStorage(cmd_executor=cmd_executor, storage_file=host_storage_file, clock=clock) host_storage = ThreadSafeHostStorage(host_storage) @@ -438,12 +445,15 @@ def get_daemon(): .format(instance.name, instance.cpu, instance.gpu, instance.mem) for instance in available_instances))) + instance_launch_params = fetch_instance_launch_params(api, launch_adapter, cluster_master_run_id) + worker_tags_handler = CloudPipelineWorkerTagsHandler(api=api, active_timeout=active_timeout, active_tag=grid_engine_type + '_IN_USE', host_storage=host_storage, static_host_storage=static_host_storage, clock=clock, common_utils=common_utils, dry_run=dry_run) scale_up_handler = GridEngineScaleUpHandler(cmd_executor=cmd_executor, api=api, grid_engine=grid_engine, + launch_adapter=launch_adapter, host_storage=host_storage, parent_run_id=cluster_master_run_id, instance_disk=instance_disk, instance_image=instance_image, @@ -489,7 +499,7 @@ def get_daemon(): max_additional_hosts=autoscale_instance_number, idle_timeout=scale_down_idle_timeout, clock=clock) daemon = GridEngineAutoscalingDaemon(autoscaler=autoscaler, worker_validator=worker_validator, - worker_tags_handler=worker_tags_handler, polling_timeout=10) + worker_tags_handler=worker_tags_handler, polling_timeout=polling_delay) if dry_init: daemon = DoNothingAutoscalingDaemon() return daemon diff --git a/workflows/pipe-common/scripts/manage_sge_profiles.py b/workflows/pipe-common/scripts/manage_sge_profiles.py index 87f2b62909..8d623e5905 100644 --- a/workflows/pipe-common/scripts/manage_sge_profiles.py +++ b/workflows/pipe-common/scripts/manage_sge_profiles.py @@ -462,7 +462,7 @@ def _launch_autoscaler(self, profile, autoscaling_script_path): self._logger.debug('Launching grid engine queue {} autoscaling...'.format(profile.name)) self._executor.execute(""" source "{autoscaling_profile_path}" -nohup "$CP_PYTHON2_PATH" "{autoscaling_script_path}" >"$LOG_DIR/.nohup.autoscaler.$CP_CAP_SGE_QUEUE_NAME.log" 2>&1 & +nohup "$CP_PYTHON2_PATH" "{autoscaling_script_path}" >"$LOG_DIR/.nohup.autoscaler.sge.$CP_CAP_SGE_QUEUE_NAME.log" 2>&1 & """.format(autoscaling_profile_path=profile.path_queue, autoscaling_script_path=autoscaling_script_path)) self._logger.info('Grid engine {} autoscaling has been launched.'.format(profile.name)) diff --git a/workflows/pipe-common/shell/kube_setup_master b/workflows/pipe-common/shell/kube_setup_master index c0769edd7c..89ac462713 100644 --- a/workflows/pipe-common/shell/kube_setup_master +++ b/workflows/pipe-common/shell/kube_setup_master @@ -375,3 +375,12 @@ if [ "$CP_CAP_KUBE_DNS_ADD_TO_HOST" == "true" ]; then echo "options rotate" >> /tmp/resolv.conf \cp /tmp/resolv.conf /etc/resolv.conf fi + +( + export CP_CAP_AUTOSCALE_TASK="KubeAutoscaling" + export CP_CAP_SGE_QUEUE_DEFAULT="true" + export CP_CAP_SGE_QUEUE_STATIC="true" + export CP_CAP_SGE_QUEUE_NAME="default" + export CP_CAP_SGE_MASTER_CORES="0" + nohup "$CP_PYTHON2_PATH" "$COMMON_REPO_DIR/scripts/autoscale_grid_engine.py" >"$LOG_DIR/.nohup.autoscaler.kube.default.log" 2>&1 & +) diff --git a/workflows/pipe-common/test/test_kube_demand_selector_default.py b/workflows/pipe-common/test/test_kube_demand_selector_default.py new file mode 100644 index 0000000000..c4c49d1533 --- /dev/null +++ b/workflows/pipe-common/test/test_kube_demand_selector_default.py @@ -0,0 +1,120 @@ +# Copyright 2017-2023 EPAM Systems, Inc. (https://www.epam.com/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +import pytest +from mock import MagicMock, Mock + +from pipeline.hpc.engine.gridengine import GridEngineJob +from pipeline.hpc.engine.kube import KubeDefaultDemandSelector +from pipeline.hpc.resource import IntegralDemand, ResourceSupply + +logging.basicConfig(level=logging.DEBUG, format='%(asctime)s [%(threadName)s] [%(levelname)s] %(message)s') + +grid_engine = Mock() +owner = 'owner' + +job_args = {'id': '1', 'root_id': 1, 'name': '', 'user': owner, 'state': '', 'datetime': ''} + +job_2cpu = GridEngineJob(cpu=2, mem=0, **job_args) +job_4cpu = GridEngineJob(cpu=4, mem=0, **job_args) +job_8cpu = GridEngineJob(cpu=8, mem=0, **job_args) + +job_2cpu_host = GridEngineJob(cpu=2, mem=0, hosts=['host'], **job_args) +job_4cpu_host = GridEngineJob(cpu=4, mem=0, hosts=['host'], **job_args) +job_8cpu_host = GridEngineJob(cpu=8, mem=0, hosts=['host'], **job_args) + +job_8cpu32mem = GridEngineJob(cpu=8, mem=32, **job_args) +job_32cpu128mem = GridEngineJob(cpu=32, mem=128, **job_args) + +test_cases = [ + ['2cpu and 4cpu and 8cpu jobs using 0cpu supply', + [job_2cpu, + job_4cpu, + job_8cpu], + [ResourceSupply(cpu=0)], + [IntegralDemand(cpu=2, owner=owner), + IntegralDemand(cpu=4, owner=owner), + IntegralDemand(cpu=8, owner=owner)]], + + ['2cpu host and 4cpu and 8cpu jobs using 0cpu supply', + [job_2cpu_host, + job_4cpu, + job_8cpu], + [ResourceSupply(cpu=0)], + [IntegralDemand(cpu=4, owner=owner), + IntegralDemand(cpu=8, owner=owner)]], + + ['2cpu host and 4cpu host and 8cpu host jobs using 0cpu supply', + [job_2cpu_host, + job_4cpu_host, + job_8cpu_host], + [ResourceSupply(cpu=0)], + []], + + ['2cpu and 4cpu and 8cpu jobs using 2cpu supply', + [job_2cpu, + job_4cpu, + job_8cpu], + [ResourceSupply(cpu=2)], + [IntegralDemand(cpu=4, owner=owner), + IntegralDemand(cpu=8, owner=owner)]], + + ['2cpu and 4cpu and 8cpu jobs using 16cpu supply', + [job_2cpu, + job_4cpu, + job_8cpu], + [ResourceSupply(cpu=16)], + []], + + ['8cpu/32mem job using 0cpu supply', + [job_8cpu32mem], + [ResourceSupply(cpu=0)], + [IntegralDemand(cpu=8, mem=32, owner=owner)]], + + ['8cpu/32mem and 32cpu/128mem job using 0cpu supply', + [job_8cpu32mem, + job_32cpu128mem], + [ResourceSupply(cpu=0)], + [IntegralDemand(cpu=8, mem=32, owner=owner), + IntegralDemand(cpu=32, mem=128, owner=owner)]], + + ['8cpu/32mem job using 32cpu/128mem supply', + [job_8cpu32mem], + [ResourceSupply(cpu=32, mem=128)], + []], +] + + +@pytest.mark.parametrize('jobs,resource_supplies,required_resource_demands', + [test_case[1:] for test_case in test_cases], + ids=[test_case[0] for test_case in test_cases]) +def test_select(jobs, resource_supplies, required_resource_demands): + grid_engine.get_host_supplies = MagicMock(return_value=iter(resource_supplies)) + demand_selector = KubeDefaultDemandSelector(grid_engine=grid_engine) + actual_resource_demands = list(demand_selector.select(jobs)) + assert required_resource_demands == actual_resource_demands diff --git a/workflows/pipe-common/test/test_kube_grid_engine.py b/workflows/pipe-common/test/test_kube_grid_engine.py new file mode 100644 index 0000000000..705741cc73 --- /dev/null +++ b/workflows/pipe-common/test/test_kube_grid_engine.py @@ -0,0 +1,281 @@ +# Copyright 2017-2023 EPAM Systems, Inc. (https://www.epam.com/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +import pykube +from datetime import datetime +from mock import MagicMock, Mock + +from pipeline.hpc.engine.gridengine import GridEngineJobState +from pipeline.hpc.engine.kube import KubeGridEngine, KubeResourceParser + +pykube.Pod = Mock() + +logging.basicConfig(level=logging.DEBUG, format='%(asctime)s [%(threadName)s] [%(levelname)s] %(message)s') + +QUEUE = 'main.q' +HOSTLIST = '@allhosts' +QUEUE_DEFAULT = True +OWNER = 'owner' + +kube = Mock() +resource_parser = KubeResourceParser() +grid_engine = KubeGridEngine(kube=kube, resource_parser=resource_parser, owner=OWNER) + + +def setup_function(): + pass + + +def test_get_jobs(): + pykube.Pod.objects = MagicMock(return_value=[ + _get_running_pod(name='pod-1', cpu='1', mem='1Gi'), + _get_pending_pod(name='pod-2', cpu='2', mem='2Gi'), + _get_pending_pod(name='pod-3', cpu=None, mem=None)]) + + jobs = grid_engine.get_jobs() + + assert len(jobs) == 3 + job_1, job_2, job_3 = jobs + + assert job_1.root_id == '8ff147ce-1724-4d8e-a76e-6a02852ec6f7' + assert job_1.id == '8ff147ce-1724-4d8e-a76e-6a02852ec6f7' + assert job_1.name == 'pod-1' + assert job_1.user == OWNER + assert job_1.state == GridEngineJobState.RUNNING + assert job_1.cpu == 1 + assert job_1.gpu == 0 + assert job_1.mem == 1 + assert job_1.datetime == datetime(2023, 12, 27, + 9, 52, 37) + assert job_1.hosts == ['pipeline-53659'] + + assert job_2.root_id == '7d1480ae-22a2-4614-bb40-eaf8e70f499d' + assert job_2.id == '7d1480ae-22a2-4614-bb40-eaf8e70f499d' + assert job_2.name == 'pod-2' + assert job_2.user == OWNER + assert job_2.state == GridEngineJobState.PENDING + assert job_2.cpu == 2 + assert job_2.gpu == 0 + assert job_2.mem == 2 + assert job_2.datetime == datetime(2023, 12, 27, + 9, 52, 47) + assert job_2.hosts == [] + + assert job_3.root_id == '7d1480ae-22a2-4614-bb40-eaf8e70f499d' + assert job_3.id == '7d1480ae-22a2-4614-bb40-eaf8e70f499d' + assert job_3.name == 'pod-3' + assert job_3.user == OWNER + assert job_3.state == GridEngineJobState.PENDING + assert job_3.cpu == 1 + assert job_3.gpu == 0 + assert job_3.mem == 0 + assert job_3.datetime == datetime(2023, 12, 27, + 9, 52, 47) + assert job_3.hosts == [] + + +def _get_pending_pod(name, cpu, mem): + pod = Mock() + pod.obj = _get_pending_pod_obj(name=name, cpu=cpu, mem=mem) + pod.name = pod.obj.get('metadata', {}).get('name') + return pod + + +def _get_running_pod(name, cpu, mem): + pod = Mock() + pod.obj = _get_running_pod_obj(name=name, cpu=cpu, mem=mem) + pod.name = pod.obj.get('metadata', {}).get('name') + return pod + + +def _get_running_pod_obj(name, cpu, mem): + return { + "status": { + "hostIP": "10.244.49.68", + "qosClass": "Guaranteed", + "containerStatuses": [ + { + "restartCount": 0, + "name": "nginx", + "started": True, + "image": "docker.io/library/nginx:1.14.2", + "imageID": "docker.io/library/nginx@sha256:f7988fb6c02e0ce69257d9bd9cf37ae20a60f1df7563c3a2a6abe24160306b8d", + "state": { + "running": { + "startedAt": "2023-12-27T09:52:42Z" + } + }, + "ready": True, + "lastState": {}, + "containerID": "containerd://c3ca0ac0ae07e9e031bc365c2821c774be7996ac5b5ca4467ccbf662641ea331" + } + ], + "podIP": "172.16.1.2", + "startTime": "2023-12-27T09:52:37Z", + "podIPs": [ + { + "ip": "172.16.1.2" + } + ], + "phase": "Running", + "conditions": [ + { + "status": "True", + "lastProbeTime": None, + "type": "Initialized", + "lastTransitionTime": "2023-12-27T09:52:37Z" + }, + { + "status": "True", + "lastProbeTime": None, + "type": "Ready", + "lastTransitionTime": "2023-12-27T09:52:42Z" + }, + { + "status": "True", + "lastProbeTime": None, + "type": "ContainersReady", + "lastTransitionTime": "2023-12-27T09:52:42Z" + }, + { + "status": "True", + "lastProbeTime": None, + "type": "PodScheduled", + "lastTransitionTime": "2023-12-27T09:52:37Z" + } + ] + }, + "spec": { + "dnsPolicy": "ClusterFirst", + "securityContext": {}, + "serviceAccountName": "default", + "schedulerName": "default-scheduler", + "enableServiceLinks": True, + "serviceAccount": "default", + "priority": 0, + "terminationGracePeriodSeconds": 30, + "restartPolicy": "Always", + "tolerations": [ + { + "operator": "Exists", + "tolerationSeconds": 300, + "effect": "NoExecute", + "key": "node.kubernetes.io/not-ready" + }, + { + "operator": "Exists", + "tolerationSeconds": 300, + "effect": "NoExecute", + "key": "node.kubernetes.io/unreachable" + } + ], + "preemptionPolicy": "PreemptLowerPriority", + "containers": [ + { + "name": "nginx", + "image": "nginx:1.14.2", + "resources": { + "requests": { + "cpu": cpu, + "memory": mem + } + } + } + ], + "nodeName": "pipeline-53659" + }, + "metadata": { + "name": name, + "namespace": "default", + "creationTimestamp": "2023-12-27T09:52:37Z", + "uid": "8ff147ce-1724-4d8e-a76e-6a02852ec6f7" + } + } + + +def _get_pending_pod_obj(name, cpu, mem): + return { + "status": { + "phase": "Pending", + "conditions": [ + { + "status": "False", + "lastTransitionTime": "2023-12-27T09:52:47Z", + "reason": "Unschedulable", + "lastProbeTime": None, + "message": "0/2 nodes are available: 1 Insufficient cpu, 1 node(s) had untolerated taint {node-role.kubernetes.io/master: }. preemption: 0/2 nodes are available: 1 No preemption victims found for incoming pod, 1 Preemption is not helpful for scheduling..", + "type": "PodScheduled" + } + ], + "qosClass": "Guaranteed" + }, + "spec": { + "dnsPolicy": "ClusterFirst", + "securityContext": {}, + "serviceAccountName": "default", + "schedulerName": "default-scheduler", + "enableServiceLinks": True, + "serviceAccount": "default", + "priority": 0, + "terminationGracePeriodSeconds": 30, + "restartPolicy": "Always", + "tolerations": [ + { + "operator": "Exists", + "tolerationSeconds": 300, + "effect": "NoExecute", + "key": "node.kubernetes.io/not-ready" + }, + { + "operator": "Exists", + "tolerationSeconds": 300, + "effect": "NoExecute", + "key": "node.kubernetes.io/unreachable" + } + ], + "containers": [ + { + "name": "nginx", + "image": "nginx:1.14.2", + "resources": { + "requests": { + "cpu": cpu, + "memory": mem + } + } + } + ], + "preemptionPolicy": "PreemptLowerPriority" + }, + "metadata": { + "name": name, + "namespace": "default", + "creationTimestamp": "2023-12-27T09:52:47Z", + "uid": "7d1480ae-22a2-4614-bb40-eaf8e70f499d" + } + } diff --git a/workflows/pipe-common/test/test_kube_resource_parser.py b/workflows/pipe-common/test/test_kube_resource_parser.py new file mode 100644 index 0000000000..051740b3a1 --- /dev/null +++ b/workflows/pipe-common/test/test_kube_resource_parser.py @@ -0,0 +1,66 @@ +# Copyright 2017-2023 EPAM Systems, Inc. (https://www.epam.com/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime + +from pipeline.hpc.engine.kube import KubeResourceParser + +parser = KubeResourceParser() + + +def test_parse_cpu(): + assert parser.parse_cpu('') == 0 + assert parser.parse_cpu('0') == 0 + assert parser.parse_cpu('1') == 1 + assert parser.parse_cpu('2') == 2 + assert parser.parse_cpu('16') == 16 + assert parser.parse_cpu('100m') == 1 + assert parser.parse_cpu('1000m') == 1 + assert parser.parse_cpu('1100m') == 2 + assert parser.parse_cpu('4000m') == 4 + assert parser.parse_cpu('4100m') == 5 + + +def test_parse_mem(): + assert parser.parse_mem('') == 0 + assert parser.parse_mem('0') == 0 + assert parser.parse_mem('1G') == 1 + assert parser.parse_mem('2Gi') == 2 + assert parser.parse_mem('2G') == 2 + assert parser.parse_mem('1000Mi') == 1 + assert parser.parse_mem('1000M') == 1 + assert parser.parse_mem('1024Mi') == 1 + assert parser.parse_mem('1024M') == 1 + assert parser.parse_mem('4000Mi') == 4 + assert parser.parse_mem('4000M') == 4 + assert parser.parse_mem('4096Mi') == 4 + assert parser.parse_mem('4096M') == 4 + + +def test_parse_date(): + assert parser.parse_date('2023-12-27T09:52:47Z') \ + == datetime(2023, 12, 27, + 9, 52, 47) diff --git a/workflows/pipe-common/test/test_scale_up_handler.py b/workflows/pipe-common/test/test_scale_up_handler.py index 4246ee6ea5..4abbf6a0b1 100644 --- a/workflows/pipe-common/test/test_scale_up_handler.py +++ b/workflows/pipe-common/test/test_scale_up_handler.py @@ -35,6 +35,7 @@ cmd_executor = Mock() grid_engine = Mock() api = Mock() +launch_adapter = Mock() host_storage = MemoryHostStorage() instance_helper = Mock() parent_run_id = 'parent_run_id' @@ -53,6 +54,7 @@ hostlist = '@allhosts' run_id_queue = Queue() scale_up_handler = GridEngineScaleUpHandler(cmd_executor=cmd_executor, api=api, grid_engine=grid_engine, + launch_adapter=launch_adapter, host_storage=host_storage, parent_run_id=parent_run_id, instance_disk=instance_disk, instance_image=instance_image, cmd_template=cmd_template, price_type=price_type, diff --git a/workflows/pipe-common/test/test_sge_grid_engine.py b/workflows/pipe-common/test/test_sge_grid_engine.py index 05d0ff5435..39495574ab 100644 --- a/workflows/pipe-common/test/test_sge_grid_engine.py +++ b/workflows/pipe-common/test/test_sge_grid_engine.py @@ -91,6 +91,8 @@ def test_qstat_parsing(): job_1, job_2 = jobs + assert job_1.root_id == '1' + assert job_1.id == '1' assert job_1.name == 'name1' assert job_1.user == 'root' assert job_1.state == GridEngineJobState.RUNNING @@ -99,8 +101,10 @@ def test_qstat_parsing(): assert job_1.mem == 0 assert job_1.datetime == datetime(2018, 12, 21, 11, 48, 00) - assert 'pipeline-38415' in job_1.hosts + assert job_1.hosts == ['pipeline-38415'] + assert job_2.root_id == '2' + assert job_2.id == '2' assert job_2.name == 'name2' assert job_2.user == 'someUser' assert job_2.state == GridEngineJobState.PENDING @@ -109,7 +113,7 @@ def test_qstat_parsing(): assert job_2.mem == 5 assert job_2.datetime == datetime(2018, 12, 21, 12, 39, 38) - assert len(job_2.hosts) == 0 + assert job_2.hosts == [] def test_qstat_array_job_parsing(): diff --git a/workflows/pipe-common/test/test_slurm_grid_engine.py b/workflows/pipe-common/test/test_slurm_grid_engine.py index b4ef428033..b24768c257 100644 --- a/workflows/pipe-common/test/test_slurm_grid_engine.py +++ b/workflows/pipe-common/test/test_slurm_grid_engine.py @@ -37,8 +37,8 @@ def setup_function(): def test_qstat_parsing(): stdout = """JobId=8 JobName=wrap1 UserId=root(0) GroupId=root(0) MCS_label=N/A Priority=4294901752 Nice=0 Account=(null) QOS=(null) JobState=RUNNING Reason=None Dependency=(null) Requeue=1 Restarts=0 BatchFlag=1 Reboot=0 ExitCode=0:0 RunTime=00:00:15 TimeLimit=365-00:00:00 TimeMin=N/A SubmitTime=2023-07-31T13:55:30 EligibleTime=2023-07-31T13:55:30 AccrueTime=2023-07-31T13:55:30 StartTime=2023-07-31T13:55:30 EndTime=2024-07-30T13:55:30 Deadline=N/A SuspendTime=None SecsPreSuspend=0 LastSchedEval=2023-07-31T13:55:30 Scheduler=Backfill Partition=main.q AllocNode:Sid=pipeline-47608:6013 ReqNodeList=(null) ExcNodeList=(null) NodeList=pipeline-47608 BatchHost=pipeline-47608 NumNodes=1 NumCPUs=1 NumTasks=1 CPUs/Task=1 ReqB:S:C:T=0:0:*:* TRES=cpu=1,node=1,billing=1 Socks/Node=* NtasksPerN:B:S:C=0:0:*:* CoreSpec=* MinCPUsNode=1 MinMemoryNode=0 MinTmpDiskNode=0 Features=(null) DelayBoot=00:00:00 OverSubscribe=OK Contiguous=0 Licenses=(null) Network=(null) Command=(null) WorkDir=/root StdErr=/root/slurm-8.out StdIn=/dev/null StdOut=/root/slurm-8.out Power= -JobId=9 JobName=wrap2 UserId=root(0) GroupId=root(0) MCS_label=N/A Priority=4294901752 Nice=0 Account=(null) QOS=(null) JobState=PENDING Reason=None Dependency=(null) Requeue=1 Restarts=0 BatchFlag=1 Reboot=0 ExitCode=0:0 RunTime=00:00:15 TimeLimit=365-00:00:00 TimeMin=N/A SubmitTime=2023-07-31T13:55:35 EligibleTime=2023-07-31T13:55:35 AccrueTime=Unknown StartTime=Unknown EndTime=2024-07-30T13:55:30 Deadline=N/A SuspendTime=None SecsPreSuspend=0 LastSchedEval=2023-07-31T13:55:30 Scheduler=Backfill Partition=main.q AllocNode:Sid=pipeline-47608:6013 ReqNodeList=(null) ExcNodeList=(null) NodeList= BatchHost=pipeline-47608 NumNodes=1 NumCPUs=2 NumTasks=1 CPUs/Task=2 ReqB:S:C:T=0:0:*:* TRES=cpu=2,node=1,billing=1 Socks/Node=* NtasksPerN:B:S:C=0:0:*:* CoreSpec=* MinCPUsNode=1 MinMemoryNode=500M MinTmpDiskNode=0 Features=(null) DelayBoot=00:00:00 OverSubscribe=OK Contiguous=0 Licenses=(null) Network=(null) Command=(null) WorkDir=/root StdErr=/root/slurm-8.out StdIn=/dev/null StdOut=/root/slurm-8.out Power= -JobId=10 JobName=wrap3 UserId=user-name(123) GroupId=root(0) MCS_label=N/A Priority=4294901752 Nice=0 Account=(null) QOS=(null) JobState=PENDING Reason=None Dependency=(null) Requeue=1 Restarts=0 BatchFlag=1 Reboot=0 ExitCode=0:0 RunTime=00:00:15 TimeLimit=365-00:00:00 TimeMin=N/A SubmitTime=2023-07-31T13:55:35 EligibleTime=2023-07-31T13:55:35 AccrueTime=Unknown StartTime=Unknown EndTime=2024-07-30T13:55:30 Deadline=N/A SuspendTime=None SecsPreSuspend=0 LastSchedEval=2023-07-31T13:55:30 Scheduler=Backfill Partition=main.q AllocNode:Sid=pipeline-47608:6013 ReqNodeList=(null) ExcNodeList=(null) NodeList= BatchHost=pipeline-47608 NumNodes=1 NumCPUs=2 NumTasks=1 CPUs/Task=2 ReqB:S:C:T=0:0:*:* TRES=cpu=2,node=1,billing=1 Socks/Node=* NtasksPerN:B:S:C=0:0:*:* CoreSpec=* MinCPUsNode=1 MinMemoryNode=500M MinTmpDiskNode=0 Features=(null) DelayBoot=00:00:00 OverSubscribe=OK Contiguous=0 Licenses=(null) Network=(null) Command=(null) WorkDir=/root StdErr=/root/slurm-8.out StdIn=/dev/null StdOut=/root/slurm-8.out Power= +JobId=9 JobName=wrap2 UserId=root(0) GroupId=root(0) MCS_label=N/A Priority=4294901752 Nice=0 Account=(null) QOS=(null) JobState=PENDING Reason=None Dependency=(null) Requeue=1 Restarts=0 BatchFlag=1 Reboot=0 ExitCode=0:0 RunTime=00:00:15 TimeLimit=365-00:00:00 TimeMin=N/A SubmitTime=2023-07-31T13:55:35 EligibleTime=2023-07-31T13:55:35 AccrueTime=Unknown StartTime=Unknown EndTime=2024-07-30T13:55:30 Deadline=N/A SuspendTime=None SecsPreSuspend=0 LastSchedEval=2023-07-31T13:55:30 Scheduler=Backfill Partition=main.q AllocNode:Sid=pipeline-47608:6013 ReqNodeList=(null) ExcNodeList=(null) NodeList= BatchHost=pipeline-47608 NumNodes=1 NumCPUs=2 NumTasks=1 CPUs/Task=2 ReqB:S:C:T=0:0:*:* TRES=cpu=2,node=1,billing=1 Socks/Node=* NtasksPerN:B:S:C=0:0:*:* CoreSpec=* MinCPUsNode=2 MinMemoryNode=500M MinTmpDiskNode=0 Features=(null) DelayBoot=00:00:00 OverSubscribe=OK Contiguous=0 Licenses=(null) Network=(null) Command=(null) WorkDir=/root StdErr=/root/slurm-8.out StdIn=/dev/null StdOut=/root/slurm-8.out Power= +JobId=10 JobName=wrap3 UserId=user-name(123) GroupId=root(0) MCS_label=N/A Priority=4294901752 Nice=0 Account=(null) QOS=(null) JobState=PENDING Reason=None Dependency=(null) Requeue=1 Restarts=0 BatchFlag=1 Reboot=0 ExitCode=0:0 RunTime=00:00:15 TimeLimit=365-00:00:00 TimeMin=N/A SubmitTime=2023-07-31T13:55:35 EligibleTime=2023-07-31T13:55:35 AccrueTime=Unknown StartTime=Unknown EndTime=2024-07-30T13:55:30 Deadline=N/A SuspendTime=None SecsPreSuspend=0 LastSchedEval=2023-07-31T13:55:30 Scheduler=Backfill Partition=main.q AllocNode:Sid=pipeline-47608:6013 ReqNodeList=(null) ExcNodeList=(null) NodeList= BatchHost=pipeline-47608 NumNodes=1 NumCPUs=2 NumTasks=1 CPUs/Task=2 ReqB:S:C:T=0:0:*:* TRES=cpu=2,node=1,billing=1 Socks/Node=* NtasksPerN:B:S:C=0:0:*:* CoreSpec=* MinCPUsNode=2 MinMemoryNode=500M MinTmpDiskNode=0 Features=(null) DelayBoot=00:00:00 OverSubscribe=OK Contiguous=0 Licenses=(null) Network=(null) Command=(null) WorkDir=/root StdErr=/root/slurm-8.out StdIn=/dev/null StdOut=/root/slurm-8.out Power= """ executor.execute = MagicMock(return_value=stdout) @@ -48,6 +48,8 @@ def test_qstat_parsing(): job_1, job_2, job_3 = jobs + assert job_1.root_id == '8' + assert job_1.id == '8_0' assert job_1.name == 'wrap1' assert job_1.user == 'root' assert job_1.state == GridEngineJobState.RUNNING @@ -57,6 +59,8 @@ def test_qstat_parsing(): assert job_1.datetime == datetime(2023, 7, 31, 13, 55, 30) + assert job_2.root_id == '9' + assert job_2.id == '9_0' assert job_2.name == 'wrap2' assert job_2.user == 'root' assert job_2.state == GridEngineJobState.PENDING @@ -66,6 +70,8 @@ def test_qstat_parsing(): assert job_2.datetime == datetime(2023, 7, 31, 13, 55, 35) + assert job_3.root_id == '10' + assert job_3.id == '10_0' assert job_3.name == 'wrap3' assert job_3.user == 'user-name' assert job_3.state == GridEngineJobState.PENDING