Skip to content

Commit

Permalink
Support kubernetes in grid engine autoscaler (#3418)
Browse files Browse the repository at this point in the history
  • Loading branch information
tcibinan authored Jan 3, 2024
1 parent 65a7285 commit f7bc82d
Show file tree
Hide file tree
Showing 15 changed files with 964 additions and 68 deletions.
17 changes: 7 additions & 10 deletions workflows/pipe-common/pipeline/hpc/autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import itertools
import time

from pipeline.hpc.engine.gridengine import GridEngineJobState, GridEngineType
from pipeline.hpc.engine.gridengine import GridEngineJobState
from pipeline.hpc.logger import Logger
from pipeline.hpc.resource import IntegralDemand
from pipeline.hpc.utils import Clock
Expand Down Expand Up @@ -129,7 +129,7 @@ class GridEngineScaleUpHandler:
_GE_POLL_TIMEOUT = 60
_GE_POLL_ATTEMPTS = 6

def __init__(self, cmd_executor, api, grid_engine, host_storage, parent_run_id, instance_disk,
def __init__(self, cmd_executor, api, grid_engine, launch_adapter, host_storage, parent_run_id, instance_disk,
instance_image, cmd_template, price_type, region_id, queue, hostlist, owner_param_name,
polling_timeout=_POLL_TIMEOUT, polling_delay=_POLL_DELAY,
ge_polling_timeout=_GE_POLL_TIMEOUT, instance_launch_params=None, clock=Clock()):
Expand All @@ -141,6 +141,7 @@ def __init__(self, cmd_executor, api, grid_engine, host_storage, parent_run_id,
:param cmd_executor: Cmd executor.
:param api: Cloud pipeline client.
:param grid_engine: Grid engine client.
:param launch_adapter: Grid engine launch adapter.
:param host_storage: Additional hosts storage.
:param parent_run_id: Additional nodes parent run id.
:param instance_disk: Additional nodes disk size.
Expand All @@ -159,6 +160,7 @@ def __init__(self, cmd_executor, api, grid_engine, host_storage, parent_run_id,
self.executor = cmd_executor
self.api = api
self.grid_engine = grid_engine
self.launch_adapter = launch_adapter
self.host_storage = host_storage
self.parent_run_id = parent_run_id
self.instance_disk = instance_disk
Expand Down Expand Up @@ -293,8 +295,9 @@ def _await_worker_initialization(self, run_id):
if run['initialized']:
Logger.info('Additional worker #%s has been marked as initialized.' % run_id)
Logger.info('Checking additional worker #%s grid engine initialization status...' % run_id)
run_sge_tasks = self.api.load_task(run_id, self.get_grid_engine_worker_task_name())
if any(run_sge_task.get('status') == 'SUCCESS' for run_sge_task in run_sge_tasks):
run_grid_engine_tasks = self.api.load_task(run_id, self.launch_adapter.get_worker_init_task_name())
if any(run_grid_engine_task.get('status') == 'SUCCESS'
for run_grid_engine_task in run_grid_engine_tasks):
Logger.info('Additional worker #%s has been initialized.' % run_id)
return
Logger.info('Additional worker #%s hasn\'t been initialized yet. Only %s attempts remain left.'
Expand All @@ -305,12 +308,6 @@ def _await_worker_initialization(self, run_id):
Logger.warn(error_msg, crucial=True)
raise ScalingError(error_msg)

def get_grid_engine_worker_task_name(self):
if self.grid_engine.get_engine_type() == GridEngineType.SLURM:
return 'SLURMWorkerSetup'
else:
return 'SGEWorkerSetup'

def _enable_worker_in_grid_engine(self, pod):
Logger.info('Enabling additional worker %s in grid engine...' % pod.name)
attempts = self.ge_polling_timeout / self.polling_delay if self.polling_delay \
Expand Down
65 changes: 50 additions & 15 deletions workflows/pipe-common/pipeline/hpc/engine/gridengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import math
from datetime import datetime

from pipeline.hpc.logger import Logger


def _perform_command(action, msg, error_msg, skip_on_failure):
Logger.info(msg)
try:
action()
except RuntimeError as e:
except Exception as e:
Logger.warn(error_msg)
if not skip_on_failure:
raise RuntimeError(error_msg, e)
Expand All @@ -29,6 +32,7 @@ class GridEngineType:

SGE = "SGE"
SLURM = "SLURM"
KUBE = "KUBE"

def __init__(self):
pass
Expand Down Expand Up @@ -83,21 +87,11 @@ class GridEngineJobState:
COMPLETED = 'completed'
UNKNOWN = 'unknown'

_letter_codes_to_states = {
# Job statuses: [SGE] + [SLURM]
RUNNING: ['r', 't', 'Rr', 'Rt'] + ['RUNNING'],
PENDING: ['qw', 'qw', 'hqw', 'hqw', 'hRwq', 'hRwq', 'hRwq', 'qw', 'qw'] + ['PENDING'],
SUSPENDED: ['s', 'ts', 'S', 'tS', 'T', 'tT', 'Rs', 'Rts', 'RS', 'RtS', 'RT', 'RtT'] + ['SUSPENDED', 'STOPPED'],
ERROR: ['Eqw', 'Ehqw', 'EhRqw'] + ['DEADLINE', ' FAILED'],
DELETED: ['dr', 'dt', 'dRr', 'dRt', 'ds', 'dS', 'dT', 'dRs', 'dRS', 'dRT'] + ['DELETED', 'CANCELLED'],
COMPLETED: [] + ['COMPLETED', 'COMPLETING']
}

@staticmethod
def from_letter_code(code):
for key in GridEngineJobState._letter_codes_to_states:
if code in GridEngineJobState._letter_codes_to_states[key]:
return key
def from_letter_code(code, state_to_codes):
for state, codes in state_to_codes.items():
if code in codes:
return state
return GridEngineJobState.UNKNOWN


Expand Down Expand Up @@ -206,3 +200,44 @@ class GridEngineJobValidator:

def validate(self, jobs):
pass


class GridEngineLaunchAdapter:

def get_worker_init_task_name(self):
pass

def get_worker_launch_params(self):
pass


class GridEngineResourceParser:

def __init__(self, datatime_format, cpu_unit=None, cpu_modifiers=None, mem_unit=None, mem_modifiers=None):
self._datetime_format = datatime_format
self._cpu_unit = cpu_unit
self._cpu_modifiers = cpu_modifiers or {}
self._mem_unit = mem_unit
self._mem_modifiers = mem_modifiers or {}

def parse_date(self, timestamp):
return datetime.strptime(timestamp, self._datetime_format)

def parse_cpu(self, quantity):
return self._parse_resource(quantity, modifiers=self._cpu_modifiers, unit=self._cpu_unit)

def parse_mem(self, quantity):
return self._parse_resource(quantity, modifiers=self._mem_modifiers, unit=self._mem_unit)

def _parse_resource(self, quantity, modifiers, unit):
if not quantity:
return 0
if len(quantity) > 1 and quantity[-2:] in modifiers:
value, value_unit = int(quantity[:-2]), quantity[-2:]
elif len(quantity) > 0 and quantity[-1:] in modifiers:
value, value_unit = int(quantity[:-1]), quantity[-1:]
else:
value, value_unit = int(quantity), ''
modifier = modifiers[value_unit]
output = float(value * modifier)
return int(math.ceil(output / modifiers.get(unit, 1)))
Loading

0 comments on commit f7bc82d

Please sign in to comment.