Skip to content

Commit

Permalink
Pass Session through find_func gen
Browse files Browse the repository at this point in the history
  • Loading branch information
nikosT committed Aug 28, 2024
1 parent 0d33256 commit 1962ae3
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 31 deletions.
22 changes: 11 additions & 11 deletions oar/kao/custom_scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
logger = get_logger("oar.custom_scheduling")


def compact(itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True):
def compact(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True):
"""
Given a job resource request and a set of resources this function tries to find a matching allocation.
Expand Down Expand Up @@ -78,7 +78,7 @@ def compact(itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True):
return result


def spread(itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True):
def spread(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True):
"""
Given a job resource request and a set of resources this function tries to find a matching allocation.
Expand Down Expand Up @@ -150,7 +150,7 @@ def spread(itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True):
return result


def co_loc(itvs_slots, hy_res_rqts, hy, beginning_slotset):
def co_loc(session, itvs_slots, hy_res_rqts, hy, beginning_slotset):
"""
Given a job resource request and a set of resources this function tries to find a matching allocation.
Expand All @@ -165,10 +165,10 @@ def co_loc(itvs_slots, hy_res_rqts, hy, beginning_slotset):
:return [ProcSet]: \
The allocation if found, otherwise an empty :class:`procset.ProcSet`
"""
return spread(itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=False)
return spread(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=False)


def no_pref(itvs_slots, hy_res_rqts, hy, beginning_slotset):
def no_pref(session, itvs_slots, hy_res_rqts, hy, beginning_slotset):
"""
Given a job resource request and a set of resources this function tries to find a matching allocation.
Expand All @@ -183,10 +183,10 @@ def no_pref(itvs_slots, hy_res_rqts, hy, beginning_slotset):
:return [ProcSet]: \
The allocation if found, otherwise an empty :class:`procset.ProcSet`
"""
return compact(itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=False)
return compact(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=False)


def f_spread(itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True):
def f_spread(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True):
"""
Given a job resource request and a set of resources this function tries to find a matching allocation.
Expand All @@ -202,18 +202,18 @@ def f_spread(itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True):
The allocation if found, otherwise an empty :class:`procset.ProcSet`
"""
avail_procset = spread(
itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=reverse
session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=reverse
)

# if no allocation space is found (by compact policy)
# fallback to compact/no_pref
if len(avail_procset) == 0:
return compact(itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=reverse)
return compact(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=reverse)
else:
return avail_procset


def f_co_loc(itvs_slots, hy_res_rqts, hy, beginning_slotset):
def f_co_loc(session, itvs_slots, hy_res_rqts, hy, beginning_slotset):
"""
Given a job resource request and a set of resources this function tries to find a matching allocation.
Expand All @@ -228,4 +228,4 @@ def f_co_loc(itvs_slots, hy_res_rqts, hy, beginning_slotset):
:return [ProcSet]: \
The allocation if found, otherwise an empty :class:`procset.ProcSet`
"""
return f_spread(itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=False)
return f_spread(session, itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=False)
2 changes: 2 additions & 0 deletions oar/kao/kamelot.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def internal_schedule_cycle(
# Scheduled
#
schedule_id_jobs_ct(
session,
all_slot_sets,
waiting_jobs,
resource_set.hierarchy,
Expand Down Expand Up @@ -196,6 +197,7 @@ def schedule_cycle(
# Scheduled
#
schedule_id_jobs_ct(
session,
all_slot_sets,
waiting_jobs,
resource_set.hierarchy,
Expand Down
2 changes: 1 addition & 1 deletion oar/kao/kamelot_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def schedule_cycle(session, config, plt, queues=["default"]):
# Scheduled
#
schedule_id_jobs_ct(
all_slot_sets, waiting_jobs, resource_set.hierarchy, waiting_jids, 0
session, all_slot_sets, waiting_jobs, resource_set.hierarchy, waiting_jids, 0
)

#
Expand Down
34 changes: 22 additions & 12 deletions oar/kao/scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Any, Tuple

from procset import ProcSet
from sqlalchemy.orm import Session

from oar.kao.quotas import Quotas
from oar.kao.slot import Slot, SlotSet, intersec_itvs_slots, intersec_ts_ph_itvs_slots
Expand Down Expand Up @@ -82,13 +83,14 @@ def set_slots_with_prev_scheduled_jobs(
slot_set.split_slots_jobs(jobs_slotsets[ss_name])


def find_resource_hierarchies_job(itvs_slots, hy_res_rqts, hy):
def find_resource_hierarchies_job(session, itvs_slots, hy_res_rqts, hy):
"""
Given a job resource request and a set of resources this function tries to find a matching allocation.
.. note::
This` can be override with the oar `extension <../admin/extensions.html#functions-assign-and-find>`_ mechanism.
:param session: The DB session
:param itvs_slots: A procset of the resources available for the allocation
:type itvs_slots: :class:`procset.ProcSet`
:param hy_res_rqts: The job's request
Expand Down Expand Up @@ -118,13 +120,14 @@ def find_resource_hierarchies_job(itvs_slots, hy_res_rqts, hy):


def find_first_suitable_contiguous_slots_quotas(
slots_set: SlotSet, job, res_rqt: Tuple[int, int, Any], hy, min_start_time: int
session: Session, slots_set: SlotSet, job, res_rqt: Tuple[int, int, Any], hy, min_start_time: int
):
"""
Loop through time slices from a :py:class:`oar.kao.slot.SlotSet` that are long enough for the job's walltime.
For each compatible time slice, call the function :py:func:`find_resource_hierarchies_job`
to find compatible resources allocation for the job, if such allocation is found the function ends.
:param Session session: The DB session
:param SlotSet slots_set: Slot set of the current platform
:param Job job: The job to schedule
:param res_rqt: The job resource request
Expand Down Expand Up @@ -189,6 +192,7 @@ def find_first_suitable_contiguous_slots_quotas(
)
# Use specialized find resource function
itvs = job.find_func(
session,
itvs_avail,
hy_res_rqts,
hy,
Expand All @@ -197,7 +201,7 @@ def find_first_suitable_contiguous_slots_quotas(
**job.find_kwargs,
)
else:
itvs = find_resource_hierarchies_job(itvs_avail, hy_res_rqts, hy)
itvs = find_resource_hierarchies_job(session, itvs_avail, hy_res_rqts, hy)

if len(itvs) != 0:
nb_res = len(itvs & ResourceSet.default_itvs)
Expand Down Expand Up @@ -226,13 +230,14 @@ def find_first_suitable_contiguous_slots_quotas(


def find_first_suitable_contiguous_slots_no_quotas(
slots_set: SlotSet, job, res_rqt, hy, min_start_time: int
session: Session, slots_set: SlotSet, job, res_rqt, hy, min_start_time: int
):
"""
Loop through time slices from a :py:class:`oar.kao.slot.SlotSet` that are long enough for the job's walltime.
For each compatible time slice, call the function :py:func:`find_resource_hierarchies_job`
to find compatible resources allocation for the job, if such allocation is found the function ends.
:param Session session: The DB session
:param SlotSet slots_set: Slot set of the current platform
:param Job job: The job to schedule
:param res_rqt: The job resource request
Expand Down Expand Up @@ -276,6 +281,7 @@ def find_first_suitable_contiguous_slots_no_quotas(

if job.find:
itvs = job.find_func(
session,
itvs_avail,
hy_res_rqts,
hy,
Expand All @@ -285,7 +291,7 @@ def find_first_suitable_contiguous_slots_no_quotas(
**job.find_kwargs,
)
else:
itvs = find_resource_hierarchies_job(itvs_avail, hy_res_rqts, hy)
itvs = find_resource_hierarchies_job(session, itvs_avail, hy_res_rqts, hy)

if len(itvs) != 0:
break
Expand All @@ -306,13 +312,14 @@ def find_first_suitable_contiguous_slots_no_quotas(


def find_first_suitable_contiguous_slots(
slots_set: SlotSet, job, res_rqt, hy, min_start_time: int
session: Session, slots_set: SlotSet, job, res_rqt, hy, min_start_time: int
):
"""
Loop through time slices from a :py:class:`oar.kao.slot.SlotSet` that are long enough for the job's walltime.
For each compatible time slice, call the function :py:func:`find_resource_hierarchies_job`
to find compatible resources allocation for the job, if such allocation is found the function ends.
:param Session session: The DB session
:param SlotSet slots_set: Slot set of the current platform
:param Job job: The job to schedule
:param res_rqt: The job resource request
Expand All @@ -322,16 +329,16 @@ def find_first_suitable_contiguous_slots(

if Quotas.enabled and not job.no_quotas:
return find_first_suitable_contiguous_slots_quotas(
slots_set, job, res_rqt, hy, min_start_time
session, slots_set, job, res_rqt, hy, min_start_time
)

return find_first_suitable_contiguous_slots_no_quotas(
slots_set, job, res_rqt, hy, min_start_time
session, slots_set, job, res_rqt, hy, min_start_time
)


def assign_resources_mld_job_split_slots(
slots_set: SlotSet, job: Job, hy, min_start_time
session: Session, slots_set: SlotSet, job: Job, hy, min_start_time
):
"""
According to a resources a :class:`SlotSet` find the time and the resources to launch a job.
Expand All @@ -345,6 +352,7 @@ def assign_resources_mld_job_split_slots(
.. note::
This function can be override with the oar `extension <../admin/extensions.html#functions-assign-and-find>`_ mechanism.
:param Session session: The DB session
:param SlotSet slots_set: A :class:`SlotSet` of the current platform
:param Job job: The job to schedule
:param hy: \
Expand All @@ -362,7 +370,7 @@ def assign_resources_mld_job_split_slots(
for res_rqt in job.mld_res_rqts:
mld_id, walltime, hy_res_rqts = res_rqt
res_set, sid_left, sid_right = find_first_suitable_contiguous_slots(
slots_set, job, res_rqt, hy, min_start_time
session, slots_set, job, res_rqt, hy, min_start_time
)
if len(res_set) == 0: # no suitable time*resources found
res_set_nfound += 1
Expand Down Expand Up @@ -403,11 +411,12 @@ def assign_resources_mld_job_split_slots(
return prev_sid_left, prev_sid_right, job


def schedule_id_jobs_ct(slots_sets, jobs, hy, id_jobs, job_security_time):
def schedule_id_jobs_ct(session, slots_sets, jobs, hy, id_jobs, job_security_time):
"""
Main scheduling loop with support for jobs container - can be recursive (recursion has not been tested)
Find an allocation for each waiting jobs.
:param Session session: The DB session
:param SlotSet slots_sets: A :class:`SlotSet` of the current platform
:param [Job] jobs: The list of the waiting jobs to schedule
:param hy: \
Expand Down Expand Up @@ -480,6 +489,7 @@ def schedule_id_jobs_ct(slots_sets, jobs, hy, id_jobs, job_security_time):
if job.assign:
# Use specialized assign function
job.assign_func(
session,
slots_set,
job,
hy,
Expand All @@ -488,7 +498,7 @@ def schedule_id_jobs_ct(slots_sets, jobs, hy, id_jobs, job_security_time):
**job.assign_kwargs,
)
else:
assign_resources_mld_job_split_slots(slots_set, job, hy, min_start_time)
assign_resources_mld_job_split_slots(session, slots_set, job, hy, min_start_time)

if "container" in job.types:
if job.types["container"] == "":
Expand Down
16 changes: 9 additions & 7 deletions oar/kao/scheduling_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
from typing import Tuple

from procset import ProcSet
from sqlalchemy.orm import Session

from oar.kao.slot import Slot, SlotSet, intersec_itvs_slots
from oar.lib.hierarchy import find_resource_hierarchies_scattered


def find_resource_hierarchies_job(itvs_slots, hy_res_rqts, hy):
def find_resource_hierarchies_job(session, itvs_slots, hy_res_rqts, hy):
"""
Find resources in interval for all resource subrequests of a moldable
instance of a job
Expand All @@ -31,7 +32,7 @@ def find_resource_hierarchies_job(itvs_slots, hy_res_rqts, hy):


def find_first_suitable_contiguous_slots(
slots_set: SlotSet, job, res_rqt, hy
session: Session, slots_set: SlotSet, job, res_rqt, hy
) -> Tuple[ProcSet, int, int]:
"""find first_suitable_contiguous_slot"""
(mld_id, walltime, hy_res_rqts) = res_rqt
Expand All @@ -48,7 +49,7 @@ def find_first_suitable_contiguous_slots(
for slot_begin, slot_end in slots_set.traverse_with_width(walltime):
# find next contiguous slots_time
itvs_avail = intersec_itvs_slots(slots, slot_begin.id, slot_end.id)
itvs = find_resource_hierarchies_job(itvs_avail, hy_res_rqts, hy)
itvs = find_resource_hierarchies_job(session, itvs_avail, hy_res_rqts, hy)

if len(itvs) != 0:
break
Expand All @@ -59,7 +60,7 @@ def find_first_suitable_contiguous_slots(
return (itvs, slot_begin.id, slot_end.id)


def assign_resources_mld_job_split_slots(slots_set: SlotSet, job, hy):
def assign_resources_mld_job_split_slots(session: Session, slots_set: SlotSet, job, hy):
"""
According to a resources a :class:`SlotSet` find the time and the resources to launch a job.
This function supports the moldable jobs. In case of multiple moldable job corresponding to the request
Expand All @@ -71,6 +72,7 @@ def assign_resources_mld_job_split_slots(slots_set: SlotSet, job, hy):
One can replace this function using an extension of OAR.
:param Session session: The DB session
:param ProcSet slots_set: A :class:`SlotSet` of the current platform
:param [Job] job: The job to schedule
:param hy: \
Expand All @@ -86,7 +88,7 @@ def assign_resources_mld_job_split_slots(slots_set: SlotSet, job, hy):
for res_rqt in job.mld_res_rqts:
(mld_id, walltime, hy_res_rqts) = res_rqt
(res_set, sid_left, sid_right) = find_first_suitable_contiguous_slots(
slots_set, job, res_rqt, hy
session, slots_set, job, res_rqt, hy
)
# print("after find fisrt suitable")
t_finish = slots[sid_left].b + walltime
Expand All @@ -113,7 +115,7 @@ def assign_resources_mld_job_split_slots(slots_set: SlotSet, job, hy):
slots_set.split_slots(prev_sid_left, prev_sid_right, job)


def schedule_id_jobs_ct(slots_sets, jobs, hy, id_jobs, security_time):
def schedule_id_jobs_ct(session, slots_sets, jobs, hy, id_jobs, security_time):
"""
Schedule loop with support for jobs container - can be recursive
(recursivity has not be tested)
Expand All @@ -133,7 +135,7 @@ def schedule_id_jobs_ct(slots_sets, jobs, hy, id_jobs, security_time):

# slots_set.show_slots()

assign_resources_mld_job_split_slots(slots_set, job, hy)
assign_resources_mld_job_split_slots(session, slots_set, job, hy)

if "container" in job.types:
Slot(
Expand Down

0 comments on commit 1962ae3

Please sign in to comment.