Skip to content

Commit

Permalink
adding relative start time of the task
Browse files Browse the repository at this point in the history
  • Loading branch information
ShreyaTalati committed Nov 24, 2023
1 parent 68e93f2 commit ca41ad3
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 24 deletions.
3 changes: 2 additions & 1 deletion src/c/backend/include/containers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -475,10 +475,11 @@ template <typename T> class ProtectedQueue {
inline bool empty_unsafe() { return this->q.empty(); }
};

// Create a min heap
class Compare {
public:
bool operator()(T a, T b){

Check failure on line 481 in src/c/backend/include/containers.hpp

View workflow job for this annotation

GitHub Actions / cpp-linter

/src/c/backend/include/containers.hpp:481:24 [clang-diagnostic-error]

unknown type name 'T'

Check failure on line 481 in src/c/backend/include/containers.hpp

View workflow job for this annotation

GitHub Actions / cpp-linter

/src/c/backend/include/containers.hpp:481:29 [clang-diagnostic-error]

unknown type name 'T'
if(a.priority >= b.priority) {
if(a.priority < b.priority) {
return true; // the order is correct and NO swapping of elements takes place
}
return false; // the order is NOT correct and swapping of elements takes place
Expand Down
32 changes: 20 additions & 12 deletions src/c/backend/include/device_queues.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

struct taskStructure { // Structure declaration
int priority; // Member (int variable)
int wait_time;
InnerTask *task; // Member (string variable)
};
// TODO(wlr): FIXME Change these back to smart pointers. I'm leaking memory
Expand Down Expand Up @@ -40,23 +41,30 @@ template <typename ResourceCategory> class DeviceQueue {
Device *get_device() { return device; }

/**
* Enqueues a task on this device.
* @param task the task to enqueue
* Calculates priority for the task. Lower the priority, earlier it should be scheduled.
* @param task the task to calculate the priority for
* @return the priority number of the task
*/

int determine_priority(InnerTask *task) {
int num_dependents = task->dependents.size(); // directly propotional
int num_gpus_required = task->assigned_devices.size(); // inveresly propotional
int start_time = time(NULL); // directly propotional
priority = (num_dependents * start_time) / num_gpus_required; // normalize and change this
int determine_priority(InnerTask *task, int global_start_time) {
int num_dependents = task->dependents.size(); // inveresly propotional -> more the # of dependents, earlier it should be scheduled
int num_gpus_required = task->assigned_devices.size(); // directly propotional -> more the # of GPUs req, later it should be scheduled
int relative_start_time = time(NULL) - global_start_time; // task coming later, should be later in the queue
priority = relative_start_time + (num_gpus_required / num_dependents); // normalize and change this
return priority;
// critical path length to most recently spawned task
// estimated completion time

}
void enqueue(InnerTask *task) {

/**
* Enqueues a task on this device.
* @param task the task to enqueue
*/


void enqueue(InnerTask *task, int global_start_time) {
taskStructure new_task;
new_task.priority = task->determine_priority(task);
new_task.priority = task->determine_priority(task, global_start_time);
new_task.task = task;
// std::cout << "DeviceQueue::enqueue() - " << task->get_name() <<
// std::endl;
Expand Down Expand Up @@ -228,15 +236,15 @@ template <typename ResourceCategory> class PhaseManager {
* Enqueues a task to the appropriate DeviceQueue(s).
* @param task the task to enqueue. May be single or multi-device.
**/
void enqueue(InnerTask *task) {
void enqueue(InnerTask *task, int global_start_time) {
// std::cout << "pointer: " << reinterpret_cast<void *>(this) << std::endl;
// std::cout << "ndevices: " << this->ndevices << std::endl;
// std::cout << "nqueues: " << this->device_queues.size() << std::endl;
// std::cout << "Enqueuing task to phase manager: " << task->get_name()
// << std::endl;
task->set_num_instances<ResourceCategory>();
for (auto device : task->assigned_devices) {
this->device_queues[device->get_global_id()]->enqueue(task);
this->device_queues[device->get_global_id()]->enqueue(task, global_start_time);
}
this->num_tasks++;
}
Expand Down
2 changes: 1 addition & 1 deletion src/c/backend/include/runtime.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1048,7 +1048,7 @@ class InnerScheduler {
void activate_wrapper();

/*Spawn a Task (increment active, set state, possibly enqueue)*/
void spawn_task(InnerTask *task);
void spawn_task(InnerTask *task, int global_start_time);

/* Enqueue task. */
void enqueue_task(InnerTask *task, TaskStatusFlags flags);
Expand Down
4 changes: 2 additions & 2 deletions src/c/backend/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ void InnerScheduler::spawn_task(InnerTask *task) {
this->enqueue_task(task, status);
}

void InnerScheduler::enqueue_task(InnerTask *task, TaskStatusFlags status) {
void InnerScheduler::enqueue_task(InnerTask *task, TaskStatusFlags status, int global_start_time) {
// TODO: Change this to appropriate phase as it becomes implemented
LOG_INFO(SCHEDULER, "Enqueing task: {}, Status: {}", task, status);
if (status.mappable && (task->get_state() < TaskState::MAPPED)) {
Expand All @@ -238,7 +238,7 @@ void InnerScheduler::enqueue_task(InnerTask *task, TaskStatusFlags status) {
} else if (status.reservable && (task->get_state() == TaskState::MAPPED)) {
task->set_status(TaskStatus::RESERVABLE);
LOG_INFO(SCHEDULER, "Enqueing task: {} to memory reserver", task);
this->memory_reserver->enqueue(task);
this->memory_reserver->enqueue(task, global_start_time);
} else if (status.runnable && (task->get_state() == TaskState::RESERVED)) {
task->set_status(TaskStatus::RUNNABLE);
//std::cout << "ENQUEUE FROM CALLBACK" << std::endl;
Expand Down
3 changes: 2 additions & 1 deletion src/python/parla/common/globals.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from enum import IntEnum
import threading
import os
import time

try:
import cupy
Expand Down Expand Up @@ -45,7 +46,7 @@


VCU_BASELINE = 1000

GLOBAL_START_TIME = time.time()

class SynchronizationType(IntEnum):
"""
Expand Down
3 changes: 2 additions & 1 deletion src/python/parla/common/spawn.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
SynchronizationType,
crosspy,
CROSSPY_ENABLED,
GLOBAL_START_TIME
)
import inspect
from typing import Collection, Any, Union, List, Tuple
Expand Down Expand Up @@ -161,7 +162,7 @@ def decorator(body):
runahead=runahead,
)
try:
scheduler.spawn_task(task)
scheduler.spawn_task(task, GLOBAL_START_TIME)
except RuntimeError:
raise RuntimeError(
"Conflicting task state while spawning task. Possible duplicate TaskID: "
Expand Down
2 changes: 1 addition & 1 deletion src/python/parla/cython/core.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ cdef extern from "include/runtime.hpp" nogil:

void activate_wrapper()

void spawn_task(InnerTask* task) except +
void spawn_task(InnerTask* task, int global_start_time) except +

void add_worker(InnerWorker* worker)
void enqueue_worker(InnerWorker* worker)
Expand Down
4 changes: 2 additions & 2 deletions src/python/parla/cython/core.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -603,11 +603,11 @@ cdef class PyInnerScheduler:
cdef InnerScheduler* c_self = self.inner_scheduler
c_self.activate_wrapper()

cpdef spawn_task(self, PyInnerTask task):
cpdef spawn_task(self, PyInnerTask task, int global_start_time):
cdef InnerScheduler* c_self = self.inner_scheduler
cdef InnerTask* c_task = task.c_task

c_self.spawn_task(c_task)
c_self.spawn_task(c_task, global_start_time)

cpdef add_worker(self, PyInnerWorker worker):
cdef InnerScheduler* c_self = self.inner_scheduler
Expand Down
6 changes: 3 additions & 3 deletions src/python/parla/cython/scheduler.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import inspect
from ..common.globals import DeviceType, cupy, CUPY_ENABLED
from ..common.globals import SynchronizationType as SyncType
from ..common.globals import _Locals as Locals
from ..common.globals import USE_PYTHON_RUNAHEAD, _global_data_tasks, PREINIT_THREADS
from ..common.globals import USE_PYTHON_RUNAHEAD, _global_data_tasks, PREINIT_THREADS, GLOBAL_START_TIME

if cupy is not None:
import cupy_backends
Expand Down Expand Up @@ -485,8 +485,8 @@ class Scheduler(ControllableThread, SchedulerContext):
for w in self.worker_threads:
w.stop()

def spawn_task(self, task):
self.inner_scheduler.spawn_task(task.inner_task)
def spawn_task(self, task, global_start_time):
self.inner_scheduler.spawn_task(task.inner_task, global_start_time)

def assign_task(self, task, worker):
task.state = tasks.TaskRunning(task.func, task.args, task.dependencies)
Expand Down

0 comments on commit ca41ad3

Please sign in to comment.