diff --git a/oar/kao/custom_scheduling.py b/oar/kao/custom_scheduling.py index 52e40e0c..2e0e61f9 100644 --- a/oar/kao/custom_scheduling.py +++ b/oar/kao/custom_scheduling.py @@ -8,7 +8,7 @@ logger = get_logger("oar.custom_scheduling") -def compact(itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True): +def compact(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True): """ Given a job resource request and a set of resources this function tries to find a matching allocation. @@ -78,7 +78,7 @@ def compact(itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True): return result -def spread(itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True): +def spread(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True): """ Given a job resource request and a set of resources this function tries to find a matching allocation. @@ -150,7 +150,7 @@ def spread(itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True): return result -def co_loc(itvs_slots, hy_res_rqts, hy, beginning_slotset): +def co_loc(session, itvs_slots, hy_res_rqts, hy, beginning_slotset): """ Given a job resource request and a set of resources this function tries to find a matching allocation. @@ -165,10 +165,10 @@ def co_loc(itvs_slots, hy_res_rqts, hy, beginning_slotset): :return [ProcSet]: \ The allocation if found, otherwise an empty :class:`procset.ProcSet` """ - return spread(itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=False) + return spread(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=False) -def no_pref(itvs_slots, hy_res_rqts, hy, beginning_slotset): +def no_pref(session, itvs_slots, hy_res_rqts, hy, beginning_slotset): """ Given a job resource request and a set of resources this function tries to find a matching allocation. @@ -183,10 +183,10 @@ def no_pref(itvs_slots, hy_res_rqts, hy, beginning_slotset): :return [ProcSet]: \ The allocation if found, otherwise an empty :class:`procset.ProcSet` """ - return compact(itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=False) + return compact(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=False) -def f_spread(itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True): +def f_spread(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True): """ Given a job resource request and a set of resources this function tries to find a matching allocation. @@ -202,18 +202,18 @@ def f_spread(itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True): The allocation if found, otherwise an empty :class:`procset.ProcSet` """ avail_procset = spread( - itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=reverse + session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=reverse ) # if no allocation space is found (by compact policy) # fallback to compact/no_pref if len(avail_procset) == 0: - return compact(itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=reverse) + return compact(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=reverse) else: return avail_procset -def f_co_loc(itvs_slots, hy_res_rqts, hy, beginning_slotset): +def f_co_loc(session, itvs_slots, hy_res_rqts, hy, beginning_slotset): """ Given a job resource request and a set of resources this function tries to find a matching allocation. @@ -228,4 +228,4 @@ def f_co_loc(itvs_slots, hy_res_rqts, hy, beginning_slotset): :return [ProcSet]: \ The allocation if found, otherwise an empty :class:`procset.ProcSet` """ - return f_spread(itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=False) + return f_spread(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=False) diff --git a/oar/kao/kamelot.py b/oar/kao/kamelot.py index d0bef78f..269539b8 100755 --- a/oar/kao/kamelot.py +++ b/oar/kao/kamelot.py @@ -95,6 +95,7 @@ def internal_schedule_cycle( # Scheduled # schedule_id_jobs_ct( + session, all_slot_sets, waiting_jobs, resource_set.hierarchy, @@ -196,6 +197,7 @@ def schedule_cycle( # Scheduled # schedule_id_jobs_ct( + session, all_slot_sets, waiting_jobs, resource_set.hierarchy, diff --git a/oar/kao/kamelot_basic.py b/oar/kao/kamelot_basic.py index 004f18f2..b1544846 100644 --- a/oar/kao/kamelot_basic.py +++ b/oar/kao/kamelot_basic.py @@ -77,7 +77,7 @@ def schedule_cycle(session, config, plt, queues=["default"]): # Scheduled # schedule_id_jobs_ct( - all_slot_sets, waiting_jobs, resource_set.hierarchy, waiting_jids, 0 + session, all_slot_sets, waiting_jobs, resource_set.hierarchy, waiting_jids, 0 ) # diff --git a/oar/kao/scheduling.py b/oar/kao/scheduling.py index eea50dfc..57cb6e68 100644 --- a/oar/kao/scheduling.py +++ b/oar/kao/scheduling.py @@ -6,6 +6,7 @@ from typing import Any, Tuple from procset import ProcSet +from sqlalchemy.orm import Session from oar.kao.quotas import Quotas from oar.kao.slot import Slot, SlotSet, intersec_itvs_slots, intersec_ts_ph_itvs_slots @@ -82,13 +83,14 @@ def set_slots_with_prev_scheduled_jobs( slot_set.split_slots_jobs(jobs_slotsets[ss_name]) -def find_resource_hierarchies_job(itvs_slots, hy_res_rqts, hy): +def find_resource_hierarchies_job(session, itvs_slots, hy_res_rqts, hy): """ Given a job resource request and a set of resources this function tries to find a matching allocation. .. note:: This` can be override with the oar `extension <../admin/extensions.html#functions-assign-and-find>`_ mechanism. + :param session: The DB session :param itvs_slots: A procset of the resources available for the allocation :type itvs_slots: :class:`procset.ProcSet` :param hy_res_rqts: The job's request @@ -118,13 +120,14 @@ def find_resource_hierarchies_job(itvs_slots, hy_res_rqts, hy): def find_first_suitable_contiguous_slots_quotas( - slots_set: SlotSet, job, res_rqt: Tuple[int, int, Any], hy, min_start_time: int + session: Session, slots_set: SlotSet, job, res_rqt: Tuple[int, int, Any], hy, min_start_time: int ): """ Loop through time slices from a :py:class:`oar.kao.slot.SlotSet` that are long enough for the job's walltime. For each compatible time slice, call the function :py:func:`find_resource_hierarchies_job` to find compatible resources allocation for the job, if such allocation is found the function ends. + :param Session session: The DB session :param SlotSet slots_set: Slot set of the current platform :param Job job: The job to schedule :param res_rqt: The job resource request @@ -189,6 +192,7 @@ def find_first_suitable_contiguous_slots_quotas( ) # Use specialized find resource function itvs = job.find_func( + session, itvs_avail, hy_res_rqts, hy, @@ -197,7 +201,7 @@ def find_first_suitable_contiguous_slots_quotas( **job.find_kwargs, ) else: - itvs = find_resource_hierarchies_job(itvs_avail, hy_res_rqts, hy) + itvs = find_resource_hierarchies_job(session, itvs_avail, hy_res_rqts, hy) if len(itvs) != 0: nb_res = len(itvs & ResourceSet.default_itvs) @@ -226,13 +230,14 @@ def find_first_suitable_contiguous_slots_quotas( def find_first_suitable_contiguous_slots_no_quotas( - slots_set: SlotSet, job, res_rqt, hy, min_start_time: int + session: Session, slots_set: SlotSet, job, res_rqt, hy, min_start_time: int ): """ Loop through time slices from a :py:class:`oar.kao.slot.SlotSet` that are long enough for the job's walltime. For each compatible time slice, call the function :py:func:`find_resource_hierarchies_job` to find compatible resources allocation for the job, if such allocation is found the function ends. + :param Session session: The DB session :param SlotSet slots_set: Slot set of the current platform :param Job job: The job to schedule :param res_rqt: The job resource request @@ -276,6 +281,7 @@ def find_first_suitable_contiguous_slots_no_quotas( if job.find: itvs = job.find_func( + session, itvs_avail, hy_res_rqts, hy, @@ -285,7 +291,7 @@ def find_first_suitable_contiguous_slots_no_quotas( **job.find_kwargs, ) else: - itvs = find_resource_hierarchies_job(itvs_avail, hy_res_rqts, hy) + itvs = find_resource_hierarchies_job(session, itvs_avail, hy_res_rqts, hy) if len(itvs) != 0: break @@ -306,13 +312,14 @@ def find_first_suitable_contiguous_slots_no_quotas( def find_first_suitable_contiguous_slots( - slots_set: SlotSet, job, res_rqt, hy, min_start_time: int + session: Session, slots_set: SlotSet, job, res_rqt, hy, min_start_time: int ): """ Loop through time slices from a :py:class:`oar.kao.slot.SlotSet` that are long enough for the job's walltime. For each compatible time slice, call the function :py:func:`find_resource_hierarchies_job` to find compatible resources allocation for the job, if such allocation is found the function ends. + :param Session session: The DB session :param SlotSet slots_set: Slot set of the current platform :param Job job: The job to schedule :param res_rqt: The job resource request @@ -322,16 +329,16 @@ def find_first_suitable_contiguous_slots( if Quotas.enabled and not job.no_quotas: return find_first_suitable_contiguous_slots_quotas( - slots_set, job, res_rqt, hy, min_start_time + session, slots_set, job, res_rqt, hy, min_start_time ) return find_first_suitable_contiguous_slots_no_quotas( - slots_set, job, res_rqt, hy, min_start_time + session, slots_set, job, res_rqt, hy, min_start_time ) def assign_resources_mld_job_split_slots( - slots_set: SlotSet, job: Job, hy, min_start_time + session: Session, slots_set: SlotSet, job: Job, hy, min_start_time ): """ According to a resources a :class:`SlotSet` find the time and the resources to launch a job. @@ -345,6 +352,7 @@ def assign_resources_mld_job_split_slots( .. note:: This function can be override with the oar `extension <../admin/extensions.html#functions-assign-and-find>`_ mechanism. + :param Session session: The DB session :param SlotSet slots_set: A :class:`SlotSet` of the current platform :param Job job: The job to schedule :param hy: \ @@ -362,7 +370,7 @@ def assign_resources_mld_job_split_slots( for res_rqt in job.mld_res_rqts: mld_id, walltime, hy_res_rqts = res_rqt res_set, sid_left, sid_right = find_first_suitable_contiguous_slots( - slots_set, job, res_rqt, hy, min_start_time + session, slots_set, job, res_rqt, hy, min_start_time ) if len(res_set) == 0: # no suitable time*resources found res_set_nfound += 1 @@ -403,11 +411,12 @@ def assign_resources_mld_job_split_slots( return prev_sid_left, prev_sid_right, job -def schedule_id_jobs_ct(slots_sets, jobs, hy, id_jobs, job_security_time): +def schedule_id_jobs_ct(session, slots_sets, jobs, hy, id_jobs, job_security_time): """ Main scheduling loop with support for jobs container - can be recursive (recursion has not been tested) Find an allocation for each waiting jobs. + :param Session session: The DB session :param SlotSet slots_sets: A :class:`SlotSet` of the current platform :param [Job] jobs: The list of the waiting jobs to schedule :param hy: \ @@ -480,6 +489,7 @@ def schedule_id_jobs_ct(slots_sets, jobs, hy, id_jobs, job_security_time): if job.assign: # Use specialized assign function job.assign_func( + session, slots_set, job, hy, @@ -488,7 +498,7 @@ def schedule_id_jobs_ct(slots_sets, jobs, hy, id_jobs, job_security_time): **job.assign_kwargs, ) else: - assign_resources_mld_job_split_slots(slots_set, job, hy, min_start_time) + assign_resources_mld_job_split_slots(session, slots_set, job, hy, min_start_time) if "container" in job.types: if job.types["container"] == "": diff --git a/oar/kao/scheduling_basic.py b/oar/kao/scheduling_basic.py index cb1f6bf8..9b24cb4d 100644 --- a/oar/kao/scheduling_basic.py +++ b/oar/kao/scheduling_basic.py @@ -2,12 +2,13 @@ from typing import Tuple from procset import ProcSet +from sqlalchemy.orm import Session from oar.kao.slot import Slot, SlotSet, intersec_itvs_slots from oar.lib.hierarchy import find_resource_hierarchies_scattered -def find_resource_hierarchies_job(itvs_slots, hy_res_rqts, hy): +def find_resource_hierarchies_job(session, itvs_slots, hy_res_rqts, hy): """ Find resources in interval for all resource subrequests of a moldable instance of a job @@ -31,7 +32,7 @@ def find_resource_hierarchies_job(itvs_slots, hy_res_rqts, hy): def find_first_suitable_contiguous_slots( - slots_set: SlotSet, job, res_rqt, hy + session: Session, slots_set: SlotSet, job, res_rqt, hy ) -> Tuple[ProcSet, int, int]: """find first_suitable_contiguous_slot""" (mld_id, walltime, hy_res_rqts) = res_rqt @@ -48,7 +49,7 @@ def find_first_suitable_contiguous_slots( for slot_begin, slot_end in slots_set.traverse_with_width(walltime): # find next contiguous slots_time itvs_avail = intersec_itvs_slots(slots, slot_begin.id, slot_end.id) - itvs = find_resource_hierarchies_job(itvs_avail, hy_res_rqts, hy) + itvs = find_resource_hierarchies_job(session, itvs_avail, hy_res_rqts, hy) if len(itvs) != 0: break @@ -59,7 +60,7 @@ def find_first_suitable_contiguous_slots( return (itvs, slot_begin.id, slot_end.id) -def assign_resources_mld_job_split_slots(slots_set: SlotSet, job, hy): +def assign_resources_mld_job_split_slots(session: Session, slots_set: SlotSet, job, hy): """ According to a resources a :class:`SlotSet` find the time and the resources to launch a job. This function supports the moldable jobs. In case of multiple moldable job corresponding to the request @@ -71,6 +72,7 @@ def assign_resources_mld_job_split_slots(slots_set: SlotSet, job, hy): One can replace this function using an extension of OAR. + :param Session session: The DB session :param ProcSet slots_set: A :class:`SlotSet` of the current platform :param [Job] job: The job to schedule :param hy: \ @@ -86,7 +88,7 @@ def assign_resources_mld_job_split_slots(slots_set: SlotSet, job, hy): for res_rqt in job.mld_res_rqts: (mld_id, walltime, hy_res_rqts) = res_rqt (res_set, sid_left, sid_right) = find_first_suitable_contiguous_slots( - slots_set, job, res_rqt, hy + session, slots_set, job, res_rqt, hy ) # print("after find fisrt suitable") t_finish = slots[sid_left].b + walltime @@ -113,7 +115,7 @@ def assign_resources_mld_job_split_slots(slots_set: SlotSet, job, hy): slots_set.split_slots(prev_sid_left, prev_sid_right, job) -def schedule_id_jobs_ct(slots_sets, jobs, hy, id_jobs, security_time): +def schedule_id_jobs_ct(session, slots_sets, jobs, hy, id_jobs, security_time): """ Schedule loop with support for jobs container - can be recursive (recursivity has not be tested) @@ -133,7 +135,7 @@ def schedule_id_jobs_ct(slots_sets, jobs, hy, id_jobs, security_time): # slots_set.show_slots() - assign_resources_mld_job_split_slots(slots_set, job, hy) + assign_resources_mld_job_split_slots(session, slots_set, job, hy) if "container" in job.types: Slot(