Skip to content

Commit

Permalink
LocalEngine: replace input queue with list to keep fifo scheduling be…
Browse files Browse the repository at this point in the history
…haviour
  • Loading branch information
dthulke committed Apr 21, 2024
1 parent 7f2747f commit 43470da
Showing 1 changed file with 13 additions and 18 deletions.
31 changes: 13 additions & 18 deletions sisyphus/localengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def start_engine(self):
if self.started:
return
# control input
self.input_queue = queue.Queue()
self.runnable_tasks = sync_object([])
self.waiting_tasks = sync_object({})

# control output / which tasks are currently running
Expand Down Expand Up @@ -194,24 +194,22 @@ def release_resources(self, rqmt, selected_devices):

@tools.default_handle_exception_interrupt_main_thread
def run(self):
checked_tasks_since_last_wait = 0
try:
while self.running.value:
self.check_finished_tasks()

wait = True # wait if no new job is started
# get next task
logging.debug("Check for new task (Free resources %s)" % self.free_resources)
with self.waiting_tasks as waiting_tasks: # get object for synchronisation
next_task = None
if not self.input_queue.empty():
next_task = self.input_queue.get()
logging.debug("Found new task: %s" % str(next_task))
# get object for synchronisation
with self.waiting_tasks as waiting_tasks, self.runnable_tasks as runnable_tasks:
total_runnable_tasks = len(runnable_tasks)
runnable_task_idx = 0

# run next task if the capacities are available
if next_task is not None:
while runnable_task_idx < total_runnable_tasks:
next_task = runnable_tasks[runnable_task_idx]
with self.running_tasks as running_tasks:
wait = False
# if enough free resources => run job
if self.enough_free_resources(next_task.rqmt):
selected_gpus = self.reserve_resources(next_task.rqmt)
Expand All @@ -227,19 +225,16 @@ def run(self):
# Start job:
process = self.start_task(next_task, selected_gpus)
running_tasks[name] = (process, next_task, selected_gpus)
del runnable_tasks[runnable_task_idx]
total_runnable_tasks -= 1
wait = False
else:
# Put next_task at end of queue and try to schedule the next one
checked_tasks_since_last_wait += 1
if checked_tasks_since_last_wait > self.input_queue.qsize():
# Wait if this is the only task in the queue
wait = True
self.input_queue.put(next_task)
runnable_task_idx += 1

if wait:
# check only once per second for new jobs
# if no job has been started
time.sleep(1)
checked_tasks_since_last_wait = 0
except KeyboardInterrupt:
# KeyboardInterrupt is handled in manager
pass
Expand All @@ -263,8 +258,8 @@ def submit_call(self, call, logpath, rqmt, name, task_name, task_ids):
call_with_id += ["--redirect_output"]

task = TaskQueueInstance(call_with_id, logpath, rqmt, name, task_name, task_id)
with self.waiting_tasks as waiting_tasks:
self.input_queue.put(task)
with self.waiting_tasks as waiting_tasks, self.runnable_tasks as runnable_tasks:
runnable_tasks.append(task)
waiting_tasks[(name, task_id)] = task
return ENGINE_NAME, socket.gethostname()

Expand Down

0 comments on commit 43470da

Please sign in to comment.